Re: flinksql 经过优化后,group by字段少了

2024-05-19 文章 Benchao Li
你引用的这个 calcite 的 issue[1] 是在 calcite-1.22.0 版本中就修复了的,Flink 应该从 1.11
版本开始就已经用的是这个 calcite 版本了。

所以你用的是哪个版本的 Flink 呢,感觉这个可能是另外一个问题。如果可以在当前最新的版本 1.19 中复现这个问题的话,可以建一个
issue 来报一个 bug。

PS: 
上面我说的这个行为,我稍微确认下了,这个应该是一个代码生成阶段才做的区分,所以优化过程中并不能识别,所以即使是batch模式下,优化的plan也应该是包含dt字段的。

[1] https://issues.apache.org/jira/browse/CALCITE-3531

℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月20日周一 11:06写道:
>
> 您好,当前是流任务。我跟了下代码,cast(CURRENT_DATE as string) 被识别了常量。这个问题已经在 calcite 
> 中修复了,https://github.com/apache/calcite/pull/1602/files
> 但是,flink 中引用的 calcite 版本并没有修复这个问题。我这边通过自定义 udf 来规避了这个问题。
>
>
>
>
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2024年5月20日(星期一) 上午10:32
> 收件人:"user-zh"
> 主题:Re: flinksql 经过优化后,group by字段少了
>
>
>
> 看起来像是因为 "dt = cast(CURRENT_DATE as string)" 推导 dt 这个字段是个常量,进而被优化掉了。
>
> 将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛?
>
> ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid 于2024年5月19日周日 01:01写道:
> 
>  create view tmp_view as
>  SELECT
>  dt, -- 2
>  uid, -- 0
>  uname, -- 1
>  uage -- 3
>  from
>  kafkaTable
>  where dt = cast(CURRENT_DATE as string);
> 
>  insert into printSinkTable
>  select
>  dt, uid, uname, sum(uage)
>  from tmp_view
>  group by
>  dt,
>  uid,
>  uname;
> 
> 
> 
>  sql 比较简单,首先根据 dt = current_date 条件进行过滤,然后 按照dt、uid、uname 三个字段进行聚合求和操作。
>  但是,经过优化后,生成的 物理结构如下:
>  == Optimized Execution Plan ==
>  Sink(table=[default_catalog.default_database.printSinkTable], 
> fields=[dt, uid, uname, EXPR$3])
>  +- Calc(select=[CAST(CAST(CURRENT_DATE())) AS dt, uid, uname, EXPR$3])
>  nbsp; nbsp;+- GroupAggregate(groupBy=[uid, uname], 
> select=[uid, uname, SUM(uage) AS EXPR$3])
>  nbsp; nbsp; nbsp; +- Exchange(distribution=[hash[uid, 
> uname]])
>  nbsp; nbsp; nbsp; nbsp; nbsp;+- 
> Calc(select=[uid, uname, uage], where=[(dt = CAST(CURRENT_DATE()))])
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; +- 
> TableSourceScan(table=[[default_catalog, default_database, kafkaTable]], 
> fields=[uid, uname, dt, uage])
> 
> 
> 
>  请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢
>
>
>
> --
>
> Best,
> Benchao Li



-- 

Best,
Benchao Li


Re: flinksql 经过优化后,group by字段少了

2024-05-19 文章 Benchao Li
看起来像是因为 "dt = cast(CURRENT_DATE  as string)" 推导 dt 这个字段是个常量,进而被优化掉了。

将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛?

℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月19日周日 01:01写道:
>
> create view tmp_view as
> SELECT
> dt, -- 2
> uid, -- 0
> uname, -- 1
> uage -- 3
> from
> kafkaTable
> where dt = cast(CURRENT_DATE  as string);
>
> insert into printSinkTable
> select
> dt, uid, uname, sum(uage)
> from tmp_view
> group by
> dt,
> uid,
> uname;
>
>
>
> sql 比较简单,首先根据 dt = current_date 条件进行过滤,然后 按照dt、uid、uname 三个字段进行聚合求和操作。
> 但是,经过优化后,生成的 物理结构如下:
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.printSinkTable], fields=[dt, 
> uid, uname, EXPR$3])
> +- Calc(select=[CAST(CAST(CURRENT_DATE())) AS dt, uid, uname, EXPR$3])
>  +- GroupAggregate(groupBy=[uid, uname], select=[uid, uname, 
> SUM(uage) AS EXPR$3])
>+- Exchange(distribution=[hash[uid, uname]])
> +- Calc(select=[uid, uname, uage], 
> where=[(dt = CAST(CURRENT_DATE()))])
>   +- 
> TableSourceScan(table=[[default_catalog, default_database, kafkaTable]], 
> fields=[uid, uname, dt, uage])
>
>
>
> 请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢



-- 

Best,
Benchao Li


Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 文章 Benchao Li
Congratulations! And thanks to all release managers and everyone
involved in this release!

Yubin Li  于2024年3月18日周一 18:11写道:
>
> Congratulations!
>
> Thanks to release managers and everyone involved.
>
> On Mon, Mar 18, 2024 at 5:55 PM Hangxiang Yu  wrote:
> >
> > Congratulations!
> > Thanks release managers and all involved!
> >
> > On Mon, Mar 18, 2024 at 5:23 PM Hang Ruan  wrote:
> >
> > > Congratulations!
> > >
> > > Best,
> > > Hang
> > >
> > > Paul Lam  于2024年3月18日周一 17:18写道:
> > >
> > > > Congrats! Thanks to everyone involved!
> > > >
> > > > Best,
> > > > Paul Lam
> > > >
> > > > > 2024年3月18日 16:37,Samrat Deb  写道:
> > > > >
> > > > > Congratulations !
> > > > >
> > > > > On Mon, 18 Mar 2024 at 2:07 PM, Jingsong Li 
> > > > wrote:
> > > > >
> > > > >> Congratulations!
> > > > >>
> > > > >> On Mon, Mar 18, 2024 at 4:30 PM Rui Fan <1996fan...@gmail.com> wrote:
> > > > >>>
> > > > >>> Congratulations, thanks for the great work!
> > > > >>>
> > > > >>> Best,
> > > > >>> Rui
> > > > >>>
> > > > >>> On Mon, Mar 18, 2024 at 4:26 PM Lincoln Lee 
> > > > >> wrote:
> > > > >>>>
> > > > >>>> The Apache Flink community is very happy to announce the release of
> > > > >> Apache Flink 1.19.0, which is the fisrt release for the Apache Flink
> > > > 1.19
> > > > >> series.
> > > > >>>>
> > > > >>>> Apache Flink® is an open-source stream processing framework for
> > > > >> distributed, high-performing, always-available, and accurate data
> > > > streaming
> > > > >> applications.
> > > > >>>>
> > > > >>>> The release is available for download at:
> > > > >>>> https://flink.apache.org/downloads.html
> > > > >>>>
> > > > >>>> Please check out the release blog post for an overview of the
> > > > >> improvements for this bugfix release:
> > > > >>>>
> > > > >>
> > > >
> > > https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
> > > > >>>>
> > > > >>>> The full release notes are available in Jira:
> > > > >>>>
> > > > >>
> > > >
> > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
> > > > >>>>
> > > > >>>> We would like to thank all contributors of the Apache Flink
> > > community
> > > > >> who made this release possible!
> > > > >>>>
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Yun, Jing, Martijn and Lincoln
> > > > >>
> > > >
> > > >
> > >
> >
> >
> > --
> > Best,
> > Hangxiang.



-- 

Best,
Benchao Li


Re: 急 [FLINK-34170] 何时能够修复?

2024-03-14 文章 Benchao Li
FLINK-34170 只是一个UI的展示问题,并不影响实际的运行。

JDBC Connector 维表下推的 filter 不生效问题,已经在 FLINK-33365 中修复了,最新的 JDBC
Connector 版本中已经带上了这个修复,你可以试一下~

casel.chen  于2024年3月15日周五 10:39写道:
>
> 我们最近在使用Flink 1.17.1开发flink sql作业维表关联使用复合主键时遇到FLINK-34170描述一样的问题,请问这个major 
> issue什么时候在哪个版本后能够修复呢?谢谢!
>
>
> select xxx from kafka_table as kt
> left join phoenix_table FORSYSTEM_TIMEASOFphoenix_table.proctime as pt
> on kt.trans_id=pt.trans_id and pt.trans_date = 
> DATE_FORMAT(CURRENT_TIMESTAMP,'MMdd');
>
>
> phoenix表主键是 trans_id + trans_date 
> 复合主键,实际作业运行发现flink只会带trans_id字段对phoenix表进行scan查询,再根据scan查询结果按trans_date字段值进行过滤
>
>
> https://issues.apache.org/jira/browse/FLINK-34170



-- 

Best,
Benchao Li


Re: RE: lock up表过滤条件下推导致的bug

2023-12-25 文章 Benchao Li
这个问题应该是跟 FLINK-33365[1] 中说的是同一个问题,这个已经在修复中了,在最新的 JDBC Connector 版本中会修复它。

[1] https://issues.apache.org/jira/browse/FLINK-33365

杨光跃  于2023年12月26日周二 09:25写道:
>
>
>
>
>
>
>
> CompiledPlan plan = env.compilePlanSql("insert into out_console " +
> " select r.apply_id from t_purch_apply_sent_route r " +
> " left join t_purch_apply_sent_route_goods FOR SYSTEM_TIME AS OF r.pt as  t " 
> +
> "ON t.apply_id = r.apply_id and t.isdel = r.isdel" +
> " where r.apply_id = 61558439941351 and  t.route_goods_id is not null and 
> t.is_change = 2 " );
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-12-25 20:46:36,"Jiabao Sun"  写道:
> >Hi,
> >
> >邮件中的图片没显示出来,麻烦把 SQL 贴出来一下。
> >
> >Best,
> >Jiabao
> >
> >
> >On 2023/12/25 12:22:41 杨光跃 wrote:
> >> 我的sql如下:
> >> 、
> >>
> >>
> >> t_purch_apply_sent_route 是通过flink cdc创建的
> >> t_purch_apply_sent_route_goods 是普通的jdbc
> >> 我期望的结果是返回符合过滤条件的;但现在执行的结果,会返回t_purch_apply_sent_route表所有数据
> >> 这显然不符合我的预期,原因应该是因为过滤条件进行了过早的下推
> >> 这应该算是bug吧,或者要满足我的预期,该怎么写sql?
> >>
> >>
> >>
> >>



-- 

Best,
Benchao Li


Re: Re: flink 1.18.0 使用 hive beeline + jdbc driver连接sql gateway失败

2023-10-30 文章 Benchao Li
hiveserver2 endpoint 就是让 flink gateway 直接变成 hive server2,对外来讲它就是 hive
server2 了,它可以直接跟已有的跟 hive server2 的工具配合一起使用。

但是现在你其实用的是 flink jdbc driver,这个并不是跟 hive server2 交互,它就是跟 flink gateway
交互,所以你用hive server2的模式启动,它就不认识了。

casel.chen  于2023年10月30日周一 14:36写道:
>
> 果然不指定endpoint为hiveserver2类型后使用hive beeline工具连接上了。感谢!
> 不过我仍然有个疑问,看官网文档上有写提供 hiveserver2 endpoint 
> 是为了兼容hive方言,按理也应该可以使用beeline连接上,因为原本beeline支持连接hiveserver2
> 以下是原文:
> HiveServer2 Endpoint is compatible with HiveServer2 wire protocol and allows 
> users to interact (e.g. submit Hive SQL) with Flink SQL Gateway with existing 
> Hive clients, such as Hive JDBC, Beeline, DBeaver, Apache Superset and so on.
> 这里有提到Beeline工具,难道不是 beeline> !connect jdbc:flink://localhost:8083 这样的连接方式了?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-10-30 11:27:15,"Benchao Li"  写道:
> >Hi casel,
> >
> >Flink JDBC 链接到 gateway 目前使用的是 flink 的 gateway 接口,所以你在启动 gateway
> >的时候不用指定 endpoint 为 hiveserver2 类型,用 Flink 默认的 gateway endpoint 类型即可。
> >
> >casel.chen  于2023年10月29日周日 17:24写道:
> >>
> >> 1. 启动flink集群
> >> bin/start-cluster.sh
> >>
> >>
> >> 2. 启动sql gateway
> >> bin/sql-gateway.sh start -Dsql-gateway.endpoint.type=hiveserver2
> >>
> >>
> >> 3. 将flink-sql-jdbc-driver-bundle-1.18.0.jar放到apache-hive-3.1.2-bin/lib目录下
> >>
> >>
> >> 4. 到apache-hive-3.1.2-bin目录下启动beeline连接sql gateway,提示输入用户名和密码时直接按的回车
> >> $ bin/beeline
> >> SLF4J: Class path contains multiple SLF4J bindings.
> >> SLF4J: Found binding in 
> >> [jar:file:/Users/admin/dev/hadoop-3.3.4/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >> SLF4J: Found binding in 
> >> [jar:file:/Users/admin/dev/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> >> explanation.
> >> SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
> >> Beeline version 3.1.2 by Apache Hive
> >> beeline> !connect jdbc:flink://localhost:8083
> >> Connecting to jdbc:flink://localhost:8083
> >> Enter username for jdbc:flink://localhost:8083:
> >> Enter password for jdbc:flink://localhost:8083:
> >> Failed to create the executor.
> >> 0: jdbc:flink://localhost:8083 (closed)> CREATE TABLE T(
> >> . . . . . . . . . . . . . . . . . . . .>   a INT,
> >> . . . . . . . . . . . . . . . . . . . .>   b VARCHAR(10)
> >> . . . . . . . . . . . . . . . . . . . .> ) WITH (
> >> . . . . . . . . . . . . . . . . . . . .>   'connector' = 'filesystem',
> >> . . . . . . . . . . . . . . . . . . . .>   'path' = 'file:///tmp/T.csv',
> >> . . . . . . . . . . . . . . . . . . . .>   'format' = 'csv'
> >> . . . . . . . . . . . . . . . . . . . .> );
> >> Failed to create the executor.
> >> Connection is already closed.
> >>
> >
> >
> >--
> >
> >Best,
> >Benchao Li



-- 

Best,
Benchao Li


Re: flink 1.18.0 使用 hive beeline + jdbc driver连接sql gateway失败

2023-10-29 文章 Benchao Li
Hi casel,

Flink JDBC 链接到 gateway 目前使用的是 flink 的 gateway 接口,所以你在启动 gateway
的时候不用指定 endpoint 为 hiveserver2 类型,用 Flink 默认的 gateway endpoint 类型即可。

casel.chen  于2023年10月29日周日 17:24写道:
>
> 1. 启动flink集群
> bin/start-cluster.sh
>
>
> 2. 启动sql gateway
> bin/sql-gateway.sh start -Dsql-gateway.endpoint.type=hiveserver2
>
>
> 3. 将flink-sql-jdbc-driver-bundle-1.18.0.jar放到apache-hive-3.1.2-bin/lib目录下
>
>
> 4. 到apache-hive-3.1.2-bin目录下启动beeline连接sql gateway,提示输入用户名和密码时直接按的回车
> $ bin/beeline
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/Users/admin/dev/hadoop-3.3.4/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/Users/admin/dev/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
> Beeline version 3.1.2 by Apache Hive
> beeline> !connect jdbc:flink://localhost:8083
> Connecting to jdbc:flink://localhost:8083
> Enter username for jdbc:flink://localhost:8083:
> Enter password for jdbc:flink://localhost:8083:
> Failed to create the executor.
> 0: jdbc:flink://localhost:8083 (closed)> CREATE TABLE T(
> . . . . . . . . . . . . . . . . . . . .>   a INT,
> . . . . . . . . . . . . . . . . . . . .>   b VARCHAR(10)
> . . . . . . . . . . . . . . . . . . . .> ) WITH (
> . . . . . . . . . . . . . . . . . . . .>   'connector' = 'filesystem',
> . . . . . . . . . . . . . . . . . . . .>   'path' = 'file:///tmp/T.csv',
> . . . . . . . . . . . . . . . . . . . .>   'format' = 'csv'
> . . . . . . . . . . . . . . . . . . . .> );
> Failed to create the executor.
> Connection is already closed.
>


-- 

Best,
Benchao Li


Re: [ANNOUNCE] Apache Flink 1.18.0 released

2023-10-26 文章 Benchao Li
Great work, thanks everyone involved!

Rui Fan <1996fan...@gmail.com> 于2023年10月27日周五 10:16写道:
>
> Thanks for the great work!
>
> Best,
> Rui
>
> On Fri, Oct 27, 2023 at 10:03 AM Paul Lam  wrote:
>
> > Finally! Thanks to all!
> >
> > Best,
> > Paul Lam
> >
> > > 2023年10月27日 03:58,Alexander Fedulov  写道:
> > >
> > > Great work, thanks everyone!
> > >
> > > Best,
> > > Alexander
> > >
> > > On Thu, 26 Oct 2023 at 21:15, Martijn Visser 
> > > wrote:
> > >
> > >> Thank you all who have contributed!
> > >>
> > >> Op do 26 okt 2023 om 18:41 schreef Feng Jin 
> > >>
> > >>> Thanks for the great work! Congratulations
> > >>>
> > >>>
> > >>> Best,
> > >>> Feng Jin
> > >>>
> > >>> On Fri, Oct 27, 2023 at 12:36 AM Leonard Xu  wrote:
> > >>>
> > >>>> Congratulations, Well done!
> > >>>>
> > >>>> Best,
> > >>>> Leonard
> > >>>>
> > >>>> On Fri, Oct 27, 2023 at 12:23 AM Lincoln Lee 
> > >>>> wrote:
> > >>>>
> > >>>>> Thanks for the great work! Congrats all!
> > >>>>>
> > >>>>> Best,
> > >>>>> Lincoln Lee
> > >>>>>
> > >>>>>
> > >>>>> Jing Ge  于2023年10月27日周五 00:16写道:
> > >>>>>
> > >>>>>> The Apache Flink community is very happy to announce the release of
> > >>>>> Apache
> > >>>>>> Flink 1.18.0, which is the first release for the Apache Flink 1.18
> > >>>>> series.
> > >>>>>>
> > >>>>>> Apache Flink® is an open-source unified stream and batch data
> > >>>> processing
> > >>>>>> framework for distributed, high-performing, always-available, and
> > >>>>> accurate
> > >>>>>> data applications.
> > >>>>>>
> > >>>>>> The release is available for download at:
> > >>>>>> https://flink.apache.org/downloads.html
> > >>>>>>
> > >>>>>> Please check out the release blog post for an overview of the
> > >>>>> improvements
> > >>>>>> for this release:
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
> > >>>>>>
> > >>>>>> The full release notes are available in Jira:
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885
> > >>>>>>
> > >>>>>> We would like to thank all contributors of the Apache Flink
> > >> community
> > >>>> who
> > >>>>>> made this release possible!
> > >>>>>>
> > >>>>>> Best regards,
> > >>>>>> Konstantin, Qingsheng, Sergey, and Jing
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> >



-- 

Best,
Benchao Li


Re: GenericRowData与BinaryRowData的转换

2023-03-13 文章 Benchao Li
Hi zilong,

应该是没有内置的方法直接进行转换的,如果有需要,还是需要自己根据schema做一遍读取和写入。

另外,在FLINK-24403[1] 中加强了对于复杂类型的print能力,可以直接把他们cast成string来打印。

[1] https://issues.apache.org/jira/browse/FLINK-24403

zilong xiao  于2023年3月13日周一 16:22写道:

> hi, benchao, 想问下有什么办法可以将BinaryRowData转成GenericRowData吗?我们业务场景需要对RowData
> toString,BinaryRowData没有实现该方法QQAQ
>
> Benchao Li  于2021年4月9日周五 10:42写道:
>
> > GenericRowData和BinaryRowData都是RowData这个接口的具体实现。
> > 所以你只需要针对RowData进行编程即可,不能假设它使用哪个具体实现。
> >
> > 关于你的问题,在算子之间数据计算和转换的时候,会有很多地方构造出来BinaryRowData,
> > 比如典型的就是序列化的时候都会按照BinaryRowData来序列化。
> >
> > Luna Wong  于2021年4月8日周四 下午7:36写道:
> >
> > > 我看Kafka Connector源码生成的是GenericRowData,到Jdbc
> > > Sink类型编程BinaryRowData了。Runtime层把GenericRowData转换BinaryRowData的规则是什么?
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: Flink SQL Client 解析 Protobuf

2022-07-03 文章 Benchao Li
Hi Min,

ProtoBuf Format[1] 有一个相关的PR,我们正在推进review和改进,预期是在1.16
中可以release出去。你也可以基于这个PR的代码编译打包一下,提前试用一下。

[1] https://github.com/apache/flink/pull/14376

Min Tu  于2022年7月4日周一 02:38写道:

> 各位大佬,
>
> 我们想利用 Flink SQL Client 解析 Kafka 数据流: 对于Kafka 数据流是Json 或者Avro 格式,已经可以解析,
> 但是对于 Protobuf 数据格式,Flink SQL Client 没有直接支持。
>
> Flink SQL Client 有一个 Raw 格式选项,不过我们还没有找到如何使用这个的文档;
>
> 看看大家有没有相关的经验可以分享
>
> 多谢
>


-- 

Best,
Benchao Li


Re: Re: Re: flink sql回撤流sink优化问题

2022-01-06 文章 Benchao Li
mini-batch对aggregate算子是有效的,开启了之后它的输出会降低一些,从而降低了sink的输出压力。

casel.chen  于2022年1月7日周五 07:42写道:

> mini-batch优化针对sink算子也有效吗?我是直接aggregate without window然后将聚合结果输出到sink算子。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-01-06 20:43:00,"Benchao Li"  写道:
> >这个问题可以用mini-batch[1]来解决呀
> >
> >[1]
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation
> >
> >casel.chen  于2021年12月26日周日 18:01写道:
> >
> >> 你说的是upsert-kafka的这两个参数吗?
> >>
> >> sink.buffer-flush.max-rows
> >> sink.buffer-flush.interval
> >> 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink
> >> kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2021-12-25 22:54:19,"郭伟权"  写道:
> >>
> >>
> >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量
> >> >
> >> >casel.chen  于2021年12月23日周四 08:15写道:
> >> >
> >> >> flink sql中aggregate without
> >> >>
> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
> >> >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
> >> >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
> >> >>
> >> >>
> >> >> 例如有下面binlog cdc购买数据(订单购买金额会更新):
> >> >>
> >> >> orderid.   categorydt
> amt
> >> >>
> >> >> 订单id 商品类型   购买时间(MMddHH)  购买金额
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
> >> >>
> >> >>
> >> >>
> >> >> INSERT INTO mysql_sink_table
> >> >>
> >> >> SELECT category, dt, LAST_VALUE(total)
> >> >>
> >> >> OVER (
> >> >>
> >> >>   PARTITION BY category
> >> >>
> >> >>   ORDER BY PROCTIME()
> >> >>
> >> >>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
> >> >>
> >> >> ) AS var1
> >> >>
> >> >> FROM (
> >> >>
> >> >>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category,
> dt
> >> >>
> >> >> );
> >>
> >
> >
> >--
> >
> >Best,
> >Benchao Li
>


-- 

Best,
Benchao Li


Re: Re: flink sql回撤流sink优化问题

2022-01-06 文章 Benchao Li
这个问题可以用mini-batch[1]来解决呀

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation

casel.chen  于2021年12月26日周日 18:01写道:

> 你说的是upsert-kafka的这两个参数吗?
>
> sink.buffer-flush.max-rows
> sink.buffer-flush.interval
> 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink
> kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-12-25 22:54:19,"郭伟权"  写道:
>
> >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量
> >
> >casel.chen  于2021年12月23日周四 08:15写道:
> >
> >> flink sql中aggregate without
> >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
> >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
> >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
> >>
> >>
> >> 例如有下面binlog cdc购买数据(订单购买金额会更新):
> >>
> >> orderid.   categorydt  amt
> >>
> >> 订单id 商品类型   购买时间(MMddHH)  购买金额
> >>
> >>
> >>
> >>
> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
> >>
> >>
> >>
> >> INSERT INTO mysql_sink_table
> >>
> >> SELECT category, dt, LAST_VALUE(total)
> >>
> >> OVER (
> >>
> >>   PARTITION BY category
> >>
> >>   ORDER BY PROCTIME()
> >>
> >>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
> >>
> >> ) AS var1
> >>
> >> FROM (
> >>
> >>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
> >>
> >> );
>


-- 

Best,
Benchao Li


Re: FlinkSQL Source和Sink的Operator name为什么格式不同

2021-10-02 文章 Benchao Li
Hi Ada,

这应该就是一个实现上的失误:

1.
source的名字是calcite的`TableScan#explainTerms`里面实现的,用的是`explainTerms(pw).item("table",
this.table.getQualifiedName())`
2.
sink里的名字是flink里的`Sink#explainTerms`实现的,用的是`.explainTerms(pw).item("table",
tableIdentifier.asSummaryString())`

yidan zhao  于2021年9月30日周四 上午11:43写道:

> 我猜哈,是因为source支持多张表。比如多个表union支持select的情况。
>
> Ada Luna  于2021年9月29日周三 下午6:02写道:
>
> > Source: TableSourceScan(table=[[default_catalog, default_database,
> > ods_k]], fields=[id, name])
> > Sink: Sink(table=[default_catalog.default_database.ads_k], fields=[id,
> > name])
> > Sink: Sink(table=[default_catalog.default_database.ads_k2], fields=[id,
> > name]))
> >
> >
> > TableSourceScan 和 Sink相比多了个 中括号,并且采用 ',' 分割名字功空间,这是为什么
> >
>


-- 

Best,
Benchao Li


Re: flink-1.12.0 流模式 使用 lag问题

2021-09-21 文章 Benchao Li
Hi,这个是一个已知的bug[1],已经在1.13.1以及1.4版本修复了。
可以使用一下1.13.1试一下,1.4版本现在也正在投票中了,应该很快就会发布出来了。

[1] https://issues.apache.org/jira/browse/FLINK-19449

kcz <573693...@qq.com.invalid> 于2021年9月22日周三 上午11:41写道:

> 如何使用才是正确的,求大佬帮看看
> behavior,next_bv 字段内容一直是保持一致的,无法得到自己想要的结果
>
>
> 发送的数据
> {
> "user_id":1,
> "item_id":1,
> "behavior":"pv1"
> }
> {
> "user_id":1,
> "item_id":1,
> "behavior":"pv2"
> }
>
>
>
>
>
>
> CREATE TABLE KafkaTable (
>  `user_id` BIGINT,
>  `item_id` BIGINT,
>  `behavior` STRING,
>  proctime as PROCTIME()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = '',
>  'properties.group.id' = 'testGroup',
>  'scan.startup.mode' = 'earliest-offset',
>  'format' = 'json'
> );
>
>
>
> SELECT
> user_id,
>     item_id,
> behavior,
> next_bv
> FROM
> ( SELECT *, lag( behavior, 1 ) over ( PARTITION BY user_id ORDER
> BY proctime ) AS next_bv FROM KafkaTable ) t;



-- 

Best,
Benchao Li


Re: Flink 从checkpoint恢复时,部分状态没有正确恢复

2021-08-30 文章 Benchao Li
这个问题已经在1.12中修复了,参考:
https://issues.apache.org/jira/browse/FLINK-18688

Benchao Li  于2021年8月30日周一 下午7:49写道:

> Hi xingxing,
>
> 看起来你可能也遇到了这个bug了。
> 我们遇到过一个bug是这样的,在group by的多个字段里面,如果有2个及以上变长字段的话,会导致底层的BinaryRow序列化
> 的结果不稳定,进而导致状态恢复会错误。
> 先解释下变长字段,这个指的是4byte存不下的数据类型,比如典型的varchar、list、map、row等等;
> 然后再解释下这个结果不稳定的原因,这个是因为在底层的代码生成里面有一个优化,会按照类型进行分组,然后进行优化,
> 但是这个分组的过程用的是一个HashMap[1],会导致字段顺序不是确定性的,有时候是这个顺序,有时候又是另外一个顺序,
> 导致最终的BinaryRow的序列化结果是不稳定的,进而导致无法从checkpoint恢复。
>
> 然后典型的多种变长类型,其实是varchar nullable 和 varchar not null 以及
> char(n),尤其是你这种用了很多常量字符串的场景,
> 容易产生后两种类型,在加上普通字段以及函数都会产生的第一种类型,就会触发这个bug了。
>
> [1]
> https://github.com/apache/flink/blob/release-1.9/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala#L92
>
> dixingxing  于2021年8月25日周三 下午8:57写道:
>
>> Hi Flink 社区:
>> 我们的Flink版本是1.9.2,用的是blink planer,我们今天遇到一个问题,目前没有定位到原因,现象如下:
>>
>> 手动重启任务时,指定了从一个固定的checkpoint恢复时,有一定的概率,一部分状态数据无法正常恢复,启动后Flink任务本身可以正常运行,且日志中没有明显的报错信息。
>> 具体现象是:type=realshow的数据没有从状态恢复,也就是从0开始累加,而type=show和type=click的数据是正常从状态恢复的。
>>
>>
>> SQL大致如下:
>> createview view1 as
>> select event_id, act_time, device_id
>> from table1
>> where `getStringFromJson`(`act_argv`, 'ispin', '') <>'1'
>> and event_id in
>> ('article_newest_list_show','article_newest_list_sight_show',
>> 'article_list_item_click', 'article_auto_video_play_click');
>>
>>
>> --天的数据
>> insertinto table2
>> select platform, type, `time`, count(1) as pv, hll_uv(device_id) as uv
>> from
>> (select'03'as platform, trim(casewhen event_id
>> ='article_newest_list_show'then'show'
>> when event_id ='article_newest_list_sight_show'then'realshow'
>> when event_id ='article_list_item_click'then'click'else''end) astype,
>> `date_parse`(`act_time`, '-MM-dd HH:mm:ss', 'MMdd') as `time`,
>> device_id
>> from view1
>> where event_id in
>> ('article_newest_list_show','article_newest_list_sight_show',
>> 'article_list_item_click')
>> unionall
>> select'03'as platform, 'click_total'astype,
>> `date_parse`(`act_time`, 'yyyy-MM-dd HH:mm:ss', 'yyyyMMdd') as `time`,
>> device_id
>> from view1
>> where event_id in ('article_list_item_click',
>> 'article_auto_video_play_click'))a
>> groupby platform, type, `time`;
>>
>>
>> 期待大家的帮助与回复,希望能给些问题排查的思路!
>>
>>
>>
>>
>
> --
>
> Best,
> Benchao Li
>


-- 

Best,
Benchao Li


Re: Flink 从checkpoint恢复时,部分状态没有正确恢复

2021-08-30 文章 Benchao Li
Hi xingxing,

看起来你可能也遇到了这个bug了。
我们遇到过一个bug是这样的,在group by的多个字段里面,如果有2个及以上变长字段的话,会导致底层的BinaryRow序列化
的结果不稳定,进而导致状态恢复会错误。
先解释下变长字段,这个指的是4byte存不下的数据类型,比如典型的varchar、list、map、row等等;
然后再解释下这个结果不稳定的原因,这个是因为在底层的代码生成里面有一个优化,会按照类型进行分组,然后进行优化,
但是这个分组的过程用的是一个HashMap[1],会导致字段顺序不是确定性的,有时候是这个顺序,有时候又是另外一个顺序,
导致最终的BinaryRow的序列化结果是不稳定的,进而导致无法从checkpoint恢复。

然后典型的多种变长类型,其实是varchar nullable 和 varchar not null 以及
char(n),尤其是你这种用了很多常量字符串的场景,
容易产生后两种类型,在加上普通字段以及函数都会产生的第一种类型,就会触发这个bug了。

[1]
https://github.com/apache/flink/blob/release-1.9/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala#L92

dixingxing  于2021年8月25日周三 下午8:57写道:

> Hi Flink 社区:
> 我们的Flink版本是1.9.2,用的是blink planer,我们今天遇到一个问题,目前没有定位到原因,现象如下:
>
> 手动重启任务时,指定了从一个固定的checkpoint恢复时,有一定的概率,一部分状态数据无法正常恢复,启动后Flink任务本身可以正常运行,且日志中没有明显的报错信息。
> 具体现象是:type=realshow的数据没有从状态恢复,也就是从0开始累加,而type=show和type=click的数据是正常从状态恢复的。
>
>
> SQL大致如下:
> createview view1 as
> select event_id, act_time, device_id
> from table1
> where `getStringFromJson`(`act_argv`, 'ispin', '') <>'1'
> and event_id in
> ('article_newest_list_show','article_newest_list_sight_show',
> 'article_list_item_click', 'article_auto_video_play_click');
>
>
> --天的数据
> insertinto table2
> select platform, type, `time`, count(1) as pv, hll_uv(device_id) as uv
> from
> (select'03'as platform, trim(casewhen event_id
> ='article_newest_list_show'then'show'
> when event_id ='article_newest_list_sight_show'then'realshow'
> when event_id ='article_list_item_click'then'click'else''end) astype,
> `date_parse`(`act_time`, '-MM-dd HH:mm:ss', 'MMdd') as `time`,
> device_id
> from view1
> where event_id in
> ('article_newest_list_show','article_newest_list_sight_show',
> 'article_list_item_click')
> unionall
> select'03'as platform, 'click_total'astype,
> `date_parse`(`act_time`, '-MM-dd HH:mm:ss', 'MMdd') as `time`,
> device_id
> from view1
> where event_id in ('article_list_item_click',
> 'article_auto_video_play_click'))a
> groupby platform, type, `time`;
>
>
> 期待大家的帮助与回复,希望能给些问题排查的思路!
>
>
>
>

-- 

Best,
Benchao Li


Re: 分组滚动窗口 无法触发计算,由于 watermark 没有被生成,或者被计算。

2021-04-13 文章 Benchao Li
这是一个已知的问题。社区已经有相关issue[1] 在跟进修复了,感兴趣的话可以了解下~

[1] https://issues.apache.org/jira/browse/FLINK-18934

jie mei  于2021年4月12日周一 下午2:59写道:

> 此外,在事件时间,场景下,如果一个 Stream A 有消息, 另一个 Stream B 没有消息进行 UNION ALL。那么 Stream B
> 的消息永远是一个 Long.MIN_VALUE, 进行水印对其的时候,UNION ALL 后的水印取所有 CHANNEL 的最小水印,也就是
> Long.MIN_VALUE, 这就导致分组滚动窗口一致得不到计算。
>
> jie mei  于2021年4月12日周一 上午11:24写道:
>
> > 问题已经解决,因为我的 StreamEnv 默认设置为事件时间。去掉就可以了,这导致了watermark没有生成。
> >
> > jie mei  于2021年4月12日周一 上午1:49写道:
> >
> >> 大家好,我有一个 Flink 程序, 使用事件时间做分组窗口计算,但是无法触发窗口计算。我Debug到 WindowOperator,  下,
> >> 发现 WindowOperator 的 TriggerContext中的当前水印一直是一个负数, StreamTaskNetworkInput
> >> 中的 processElement 方法没有接受到 watermark 消息, recordOrMark.isWatermark() ==
> false。
> >>
> >> 我自己的怀疑难道是事件时间每设置对? 但是对比了文档,应该是可以的。下面是我的 DDL
> >>
> >> create table input_table (
> >> `dim` varchar,
> >>  `server_time` bigint,
> >>  `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000,
> >> '-MM-dd HH:mm:ss')),
> >>  WATERMARK FOR `event_time` AS `event_time`
> >> )
> >> select TUMBLE_START(`event_time`, INTERVAL '1' SECOND) AS `log_time`,
> >> `dim`,
> >> count(1),
> >> FROM input_table
> >>  GROUP BY TUMBLE(`event_time`, INTERVAL '1' SECOND),`dim`
> >>
> >>
> >>
> >> *Best Regards*
> >> *Jeremy Mei*
> >>
> >
> >
> > --
> >
> > *Best Regards*
> > *Jeremy Mei*
> >
>
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>


-- 

Best,
Benchao Li


Re: GenericRowData与BinaryRowData的转换

2021-04-08 文章 Benchao Li
GenericRowData和BinaryRowData都是RowData这个接口的具体实现。
所以你只需要针对RowData进行编程即可,不能假设它使用哪个具体实现。

关于你的问题,在算子之间数据计算和转换的时候,会有很多地方构造出来BinaryRowData,
比如典型的就是序列化的时候都会按照BinaryRowData来序列化。

Luna Wong  于2021年4月8日周四 下午7:36写道:

> 我看Kafka Connector源码生成的是GenericRowData,到Jdbc
> Sink类型编程BinaryRowData了。Runtime层把GenericRowData转换BinaryRowData的规则是什么?
>


-- 

Best,
Benchao Li


Re: Flink1.11执行sql当判空使用<> null,程序直接结束

2021-03-19 文章 Benchao Li
嗯,是这样的。

datayangl  于2021年3月19日周五 下午5:55写道:

> calcite解析将<> null 解析为unknown, 在flink优化阶段直接将unkown这个条件默认视为false,通过规则匹配
> 将整条sql优化为values(没有任何结果的sql),于是直接将程序的source task finish了。这个过程我理解的对吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li


Re: Flink1.11执行sql当判空使用<> null,程序直接结束

2021-03-19 文章 Benchao Li
Hi datayangl,

这是因为kafka_table.src_ip <>
null是恒等于false的,所以这个计算过程就被优化掉了,最后你的作业的逻辑就变成了一个单纯的values,里面没有一条数据。

关于为什么kafka_table.src_ip <> null,这个可以了解一下关于three-value-logic[1].
简单来说,在标准SQL里面,boolean类型是有三种值的,正常的= <>这种算子跟null比较的时候,结果都是unknown,
然后这个在filter条件里面会被视作false。

[1] https://modern-sql.com/concept/three-valued-logic

datayangl  于2021年3月19日周五 下午4:02写道:

> 环境:flink1.11:
> 代码如下:
> val dataStreamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv
> val tableEnv: StreamTableEnvironment = FlinkUtils.streamTableEnv
> val sql = """SELECT
>   CASE
> WHEN
>   kafka_table.log_type = 'detect'
>   AND
>   kafka_table.event_level = 3
> THEN 3
> ELSE 0
>   END as weight,
>   kafka_table.src_ip as kafka_table_src_ip_0,
>   kafka_table.dev_type as kafka_table_dev_type_0
> FROM
>   kafka_table
> WHERE
>   kafka_table.event_time >= unix_timestamp() - 60 * 60 * 5
>   AND
>   kafka_table.src_ip <> null
>   AND
>   kafka_table.event_level > 0
>   AND
>   kafka_table.dev_type = 1
>
>
> val data:Table = tableEnv.sqlQuery(sql)
> val result = tableEnv.toRetractStream[Row](data)
> result.print(">")
> """
>
>
>
> 现象:如果判空条件为kafka_table.src_ip <> null,则程序直接结束,没有任何报错,而使用kafka_table.src_ip
> is
> not null 可以正常运行并一直产生数据。
>
> 疑问:我明白is not null是正确的用法,问题是用<> null 为什么程序会直接结束而且没有任何报错,感觉像是当作批处理去运行了。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: flink sql 的 count(distinct )问题

2021-03-16 文章 Benchao Li
Hi,
你可以理解为用的是MapState来保存的状态。

op <520075...@qq.com> 于2021年3月16日周二 下午3:00写道:

> 各位大佬好,想问下flinksql里的count (distinct)默认是用哪种state保存的状态



-- 

Best,
Benchao Li


Re: UDF 重复调用的问题、

2021-03-02 文章 Benchao Li
我没有搜到相关的issue,所以我先建了一个issue[1]。
这个优化相对来说影响比较大,需要仔细的设计和权衡,所以在社区推进的速度
可能没有办法保证,大家感兴趣的可以在issue里去讨论。

[1] https://issues.apache.org/jira/browse/FLINK-21573

Qishang  于2021年3月3日周三 上午11:03写道:

> Hi Benchao.
>
> 现在的场景是UDF中去查询外部存储,数据量不大,但是执行多次还是在一个算子里串行的。算子耗时就会变成调用次数的倍数了。 这个影响就有点严重了。
> 这个 feature 社区有规划了吗?
>
>
> Benchao Li  于2021年3月3日周三 上午10:23写道:
>
> > 当前的确是还没有表达式复用的优化,所以表达式最终都是会重复执行的。
> > 这个应该是未来要优化的一个点,我们内部也是刚刚做了这个feature。
> >
> > 这个没有复用不只是在SQL里面看到的多少次,就会执行多少次,而是在
> > plan的过程中会将表达式完全展开,比如下面的SQL:
> > ```SQL
> > SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as
> > key3
> > FROM (
> >   SELECT dump_json_to_map(col1) as my_map
> >   FROM T
> > )
> > ```
> > 这种写法也会将`dump_json_to_map`这个函数执行3次。
> >
> > HunterXHunter <1356469...@qq.com> 于2021年3月3日周三 上午9:43写道:
> >
> > > 为什么4次是没问题的,感觉只执行一次才是最优的
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: UDF 重复调用的问题、

2021-03-02 文章 Benchao Li
当前的确是还没有表达式复用的优化,所以表达式最终都是会重复执行的。
这个应该是未来要优化的一个点,我们内部也是刚刚做了这个feature。

这个没有复用不只是在SQL里面看到的多少次,就会执行多少次,而是在
plan的过程中会将表达式完全展开,比如下面的SQL:
```SQL
SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as
key3
FROM (
  SELECT dump_json_to_map(col1) as my_map
  FROM T
)
```
这种写法也会将`dump_json_to_map`这个函数执行3次。

HunterXHunter <1356469...@qq.com> 于2021年3月3日周三 上午9:43写道:

> 为什么4次是没问题的,感觉只执行一次才是最优的
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li


Re: 关于 stream-stream Interval Join 的问题

2020-12-10 文章 Benchao Li
你用的是哪个版本的Flink呢?

看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
所以你的binlog是怎么读进来的呢?自定义的format?

macia kk  于2020年12月10日周四 上午1:06写道:

> 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS event_time -
> INTERVAL 'x' HOUR
>
>  发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness
>
> 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x,
> 能够反推出来数据的 currentMaxTimestamp
>
> currentMaxTimestamp = watermark + maxOutOfOrderness
>
> 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快
> 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。
>
>
> 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55
>
> {"table":"transaction_tab_0122","database":"main_db","transaction_type":1,"transaction_id":11,"reference_id":"11","transaction_sn":"1","merchant_id":1,"status":1,"event_time":"
> *2020-12-10T01:02:24Z*"}
>
> UI 上显示的 watermark 是 1607555031000(Your time zone: 2020年12月10日星期四早上7点02分
> GMT+08:00)
>
> 这个 watermark 是未来的时间 
>
>
>
>
>
> macia kk  于2020年12月9日周三 下午11:36写道:
>
> > 感谢 一旦 和 Benchao
> >
> >   1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join 上的数据,但是我
> Job
> > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。
> >
> > val result = bsTableEnv.sqlQuery("""
> >SELECT *
> >FROM (
> >   SELECT t1.`table`, t1.`database`, t1.transaction_type,
> t1.transaction_id,
> > t1.reference_id, t1.transaction_sn, t1.merchant_id,
> t1.status, t1.event_time
> >   FROM main_db as t1
> >   LEFT JOIN main_db as t2
> >   ON t1.reference_id = t2.reference_id
> >   WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES
> >AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES
> >)
> >   """.stripMargin)
> >
> > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到
> >
> > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
> > subtask的watermark。
> > ---
> > 这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark
> > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog.
> >
> > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为 event
> > time,但是有的表又没有这个字段,会导致解析的时候直接报错.
> >
> > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark
> > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为.
> >
> >
> > Thanks and best regards
> >
> >
> > Benchao Li  于2020年12月9日周三 上午10:24写道:
> >
> >> Hi macia,
> >>
> >> 一旦回答的基本比较完整了。
> >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
> >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。
> >>
> >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source
> subtask见到的最大的watermark
> >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay
> 10个小时,这个已经会导致
> >> 你的没有join到的数据下发会延迟很多了。
> >>
> >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。
> >>
> >> 赵一旦  于2020年12月9日周三 上午10:15写道:
> >>
> >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
> >> >
> >> >
> >> >
> >>
> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
> >> > join。
> >> >
> >> > (2)此外,还有一个点,这个我也不确认。如果是datastream
> >> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
> >> >
> >> >
> >>
> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。
> >> >
> >> > macia kk  于2020年12月9日周三 上午1:17写道:
> >> >
> >> > > @Benchao Li   感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
> >> > > FLink,可能我的Case 太特殊了.
> >> > >
> >> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的
> >> Binlog,我需要
> >> > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个
> DB
> >> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
> >> > >
> >> > > 还要注意的是 even time 是 create_time, 这里问题非常大:
> >> > >  1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
> >> > >  2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响
> >> > watermark
> >> > > forward on.
> >> > >
> >> > > bsTableEnv.execute

Re: interval join 时checkpoint失败

2020-12-09 文章 Benchao Li
反压的话,你可以重点看下你使用的是什么state backend,
如果是filesystem,那状态就是放heap的,这种你需要重点看下gc相关的问题;
如果是rocksdb,这种状态是直接序列化到rocksdb中了,一般很少有内存问题,更多的是IO问题,或者CPU瓶颈。
你可以按照找个思路排查一下。

song wang  于2020年12月10日周四 上午11:38写道:

> hi,Benchao,
> 是的,任务失败时,右流出现了反压,已经连续两天出现这个问题了,我看下为啥会出现反压,感谢!
>
> Benchao Li  于2020年12月10日周四 上午11:28写道:
>
> > 你可以检查下在Checkpoint失败的时候是不是任务已经在反压了,
> > 看起来是有可能因为反压导致的Checkpoint超时失败。
> >
> > song wang  于2020年12月10日周四 上午10:59写道:
> >
> > > 各位好,
> > > 两个流进行interval join,时间窗口是
> > -23h,+1h,任务可以正常运行23小时左右,之后便报错checkpoint失败,jobmanager
> > > log中的报错信息为:
> > >
> > > 2020-12-10 10:46:51,813 INFO org.apache.flink.runtime.checkpoint.
> > > CheckpointCoordinator - Checkpoint 143 of job
> > > ee4114a1c5413bd02a68b1165090578e expired before completing.
> > >
> > >
> > > 无其他报错信息,最大checkpoint时间为10min;
> > >
> > >
> > > flink版本:1.9.0
> > >
> > > checkpooint配置信息为:
> > >
> > > env.enableCheckpointing(60);
> > >
> > >
> >
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
> > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
> > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > >
> > >
> > > 各位大佬能否给些排查建议呢?
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: interval join 时checkpoint失败

2020-12-09 文章 Benchao Li
你可以检查下在Checkpoint失败的时候是不是任务已经在反压了,
看起来是有可能因为反压导致的Checkpoint超时失败。

song wang  于2020年12月10日周四 上午10:59写道:

> 各位好,
> 两个流进行interval join,时间窗口是 -23h,+1h,任务可以正常运行23小时左右,之后便报错checkpoint失败,jobmanager
> log中的报错信息为:
>
> 2020-12-10 10:46:51,813 INFO org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator - Checkpoint 143 of job
> ee4114a1c5413bd02a68b1165090578e expired before completing.
>
>
> 无其他报错信息,最大checkpoint时间为10min;
>
>
> flink版本:1.9.0
>
> checkpooint配置信息为:
>
> env.enableCheckpointing(60);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
> 各位大佬能否给些排查建议呢?
>
>
>
>
>
>
>

-- 

Best,
Benchao Li


Re: 关于 stream-stream Interval Join 的问题

2020-12-08 文章 Benchao Li
Hi macia,

一旦回答的基本比较完整了。
watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
如果是两侧都有数据,watermark不前进,也都可以正常输出。

关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source subtask见到的最大的watermark
作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay 10个小时,这个已经会导致
你的没有join到的数据下发会延迟很多了。

你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。

赵一旦  于2020年12月9日周三 上午10:15写道:

> 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
>
>
> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
> join。
>
> (2)此外,还有一个点,这个我也不确认。如果是datastream
> api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
>
> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。
>
> macia kk  于2020年12月9日周三 上午1:17写道:
>
> > @Benchao Li   感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
> > FLink,可能我的Case 太特殊了.
> >
> > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 Binlog,我需要
> > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB
> > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
> >
> > 还要注意的是 even time 是 create_time, 这里问题非常大:
> >  1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
> >  2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响
> watermark
> > forward on.
> >
> > bsTableEnv.executeSql("""
> >   CREATE TABLE input_database (
> > `table` STRING,
> > `database` STRING,
> > `data` ROW(
> >   reference_id STRING,
> >   transaction_sn STRING,
> >   transaction_type BIGINT,
> >   merchant_id BIGINT,
> >   transaction_id BIGINT,
> >   status BIGINT
> >  ),
> > ts BIGINT,
> > event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
> > WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
> >  ) WITH (
> >'connector.type' = 'kafka',
> >'connector.version' = '0.11',
> >'connector.topic' = 'mytopic',
> >'connector.properties.bootstrap.servers' = '',
> >'format.type' = 'json'
> >  )
> > """)
> >
> >
> > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。
> >
> > val main_db = bsTableEnv.sqlQuery("""
> >   | SELECT *
> >   | FROM input_database
> >   | WHERE `database` = 'main_db'
> >   |  AND `table` LIKE 'transaction_tab%'
> >   | """.stripMargin)
> >
> > val merchant_db = bsTableEnv.sqlQuery("""
> >   | SELECT *
> >   | FROM input_database
> >   | WHERE `database` = 'merchant_db'
> >   |   AND `table` LIKE 'transaction_tab%'
> >   | """.stripMargin)
> >
> > bsTableEnv.createTemporaryView("main_db", main_db)
> > bsTableEnv.createTemporaryView("merchant_db", merchant_db)
> >
> > val result = bsTableEnv.sqlQuery("""
> >SELECT *
> >FROM (
> >   SELECT t1.`table`, t1.`database`, t1.transaction_type,
> > t1.transaction_id,
> > t1.reference_id, t1.transaction_sn, t1.merchant_id,
> > t1.status, t1.event_time
> >   FROM main_db as t1
> >   LEFT JOIN merchant_db as t2
> >       ON t1.reference_id = t2.reference_id
> >   WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR
> >AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR
> >)
> >   """.stripMargin)
> >
> >
> >
> > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
> > -
> > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 watermark
> > 来驱动。
> > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 join上,就输出
> > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把.
> >
> >
> >
> >
> >
> >
> > Benchao Li  于2020年12月8日周二 下午3:23写道:
> >
> > > hi macia,
> > >
> > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
> > >
> > > macia kk  于2020年12月8日周二 上午1:15写道:
> > >
> > > > 抱歉,是 >-30 and <+30
> > > >
> > > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
> > > >
> > > > 赵一旦 于2020年12月7日 周一23:28写道:
> > > >
> > > > > 准确点,2个条件之间没and?2个都是>?
> > > > >
> > > > > macia kk  于2020年12月7日周一 下午10:30写道:
> >

Re: 关于 stream-stream Interval Join 的问题

2020-12-07 文章 Benchao Li
hi macia,

事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?

macia kk  于2020年12月8日周二 上午1:15写道:

> 抱歉,是 >-30 and <+30
>
> 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
>
> 赵一旦 于2020年12月7日 周一23:28写道:
>
> > 准确点,2个条件之间没and?2个都是>?
> >
> > macia kk  于2020年12月7日周一 下午10:30写道:
> >
> > > 不好意思,我上边贴错了
> > >
> > > SELECT *
> > >  FROM A
> > >  LEFT OUT JOIN B
> > >  ON order_id
> > >  Where A.event_time > B.event_time -  30 s
> > >  A.event_time > B.event_time + 30 s
> > >
> > > event_time 是 Time Attributes 设置的 event_time
> > >
> > > 这样是没有输出的。
> > >
> > >
> > >
> > > interval join 左右表在 state 中是缓存多久的?
> > >
> > >
> > >
> > >
> > >
> > >
> > > hailongwang <18868816...@163.com> 于2020年12月7日周一 下午8:05写道:
> > >
> > > > Hi,
> > > > 其中 条件是
> > > > `Where A.event_time < B.event_time + 30 s and A.event_time >
> > B.event_time
> > > > - 30 s ` 吧
> > > > 可以参考以下例子[1],看下有木有写错。
> > > > [1]
> > > >
> > >
> >
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
> > > >
> > > >
> > > > Best,
> > > > Hailong
> > > > 在 2020-12-07 13:10:02,"macia kk"  写道:
> > > > >Hi, 各位大佬
> > > > >
> > > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
> > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order
> > > item
> > > > >信息,所以 我用:
> > > > >
> > > > > SELECT *
> > > > > FROM A
> > > > > LEFT OUT JOIN B
> > > > > ON order_id
> > > > > Where A.event_time > B.event_time + 30 s
> > > > > A.event_time > B.event_time - 30 s
> > > > >
> > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark
> > > > Structural
> > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题?
> > > >
> > >
> >
>


-- 

Best,
Benchao Li


Re: flink-json 函数用法

2020-11-29 文章 Benchao Li
Hi,

目前Flink SQL应该还没有正式支持json函数吧,上面的报错信息看起来也是符合预期的,说的是目前还找不到这个函数。

相关信息可以参考:https://issues.apache.org/jira/browse/FLINK-9477


Yan,Yunpeng(DXM,PB)  于2020年11月30日周一 下午2:18写道:

> Flink SQL> select JSON_OBJECT('product_type' VALUE product_type)
> > from income_fee
> > ;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.calcite.sql.validate.SqlValidatorException: No match found for
> function signature JSON_OBJECT(, , )
>
> Flink SQL> select JSON_OBJECT('product_type' VALUE product_type)
> > from sp_income_fee
> > where enabled = 1
> > group by id;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.calcite.sql.validate.SqlValidatorException: No match found for
> function signature JSON_OBJECT(, , )
>
> Flink SQL> select JSON_ARRAYAGG(product_type)
> > from income_fee
> > where f_enabled = 1;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Unsupported Function:
> 'JSON_ARRAYAGG_ABSENT_ON_NULL'
>
> 闫云鹏
> DXM 支付业务部
> 地址:北京市海淀区西北旺东路度小满金融总部
> 邮编:100085
> 手机:13693668213
> 邮箱:yanyunp...@duxiaoman.com
>
> 度小满金融
> 精于科技 值得信赖
>
>
>
> 在 2020/11/30 11:05,“caozhen” 写入:
>
> 可以把使用方法和 报错信息 发下嘛?
>
>
>
>
> Yan,Yunpeng(DXM,PB) wrote
> > Hi:
> >   尝试使用flink-sql将聚合结果json展示的时候发现flink是支持JSON_OBJECTAGG, JSON_ARRAY,
> > JSON_OBJECT 等这种函数的(使用的默认的blink),
> 但是总是报错函数的用法不对,有相关资料来介绍这些函数的使用方法的吗?或者示例
> >
> > 闫云鹏
> > DXM 支付业务部
> > 地址:北京市海淀区西北旺东路度小满金融总部
> > 邮编:100085
> > 手机:13693668213
> > 邮箱:
>
> > yanyunpeng@
>
> > mailto:
>
> > yanyunpeng@
>
> > 
> >
> > 度小满金融
> >
> > 精于科技 值得信赖
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>

-- 

Best,
Benchao Li


Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 Benchao Li
从这一行代码看出来的:
https://github.com/yangyichao-mango/flink-protobuf/blob/616051d74d0973136f931189fd29bd78c0e5/src/main/java/flink/formats/protobuf/ProtobufRowDeserializationSchema.java#L107

现在社区还没有正式支持ProtoBuf Format,不过已经有相关issue和讨论了[1]

[1] https://issues.apache.org/jira/browse/FLINK-18202

zilong xiao  于2020年11月24日周二 下午4:46写道:

> 这是从哪看出来的呢 求指点,另外如果想用DDL写的schema 应该怎么做呢?
>
> Benchao Li  于2020年11月24日周二 下午4:33写道:
>
> > 看起来这个format是用的自动推导schema,而不是用的DDL写的schema。
> >
> > zilong xiao  于2020年11月24日周二 下午4:13写道:
> >
> > > 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧,
> > > https://github.com/yangyichao-mango/flink-protobuf
> > >
> > > Benchao Li  于2020年11月24日周二 下午3:43写道:
> > >
> > > > 看起来你的DDL写的没有什么问题。
> > > >
> > > > 你用的是哪个Flink版本呢?
> > > > 此外就是可以发下更完整的异常栈么?
> > > >
> > > > zilong xiao  于2020年11月24日周二 下午2:54写道:
> > > >
> > > > > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~
> > > > >
> > > > > Benchao Li  于2020年11月24日周二 下午2:49写道:
> > > > >
> > > > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
> > > > > >
> > > > > > zilong xiao  于2020年11月24日周二 上午10:49写道:
> > > > > >
> > > > > > > [image: image.png]
> > > > > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Best,
> > > > > > Benchao Li
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 Benchao Li
看起来这个format是用的自动推导schema,而不是用的DDL写的schema。

zilong xiao  于2020年11月24日周二 下午4:13写道:

> 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧,
> https://github.com/yangyichao-mango/flink-protobuf
>
> Benchao Li  于2020年11月24日周二 下午3:43写道:
>
> > 看起来你的DDL写的没有什么问题。
> >
> > 你用的是哪个Flink版本呢?
> > 此外就是可以发下更完整的异常栈么?
> >
> > zilong xiao  于2020年11月24日周二 下午2:54写道:
> >
> > > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~
> > >
> > > Benchao Li  于2020年11月24日周二 下午2:49写道:
> > >
> > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
> > > >
> > > > zilong xiao  于2020年11月24日周二 上午10:49写道:
> > > >
> > > > > [image: image.png]
> > > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 Benchao Li
看起来你的DDL写的没有什么问题。

你用的是哪个Flink版本呢?
此外就是可以发下更完整的异常栈么?

zilong xiao  于2020年11月24日周二 下午2:54写道:

> Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~
>
> Benchao Li  于2020年11月24日周二 下午2:49写道:
>
> > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
> >
> > zilong xiao  于2020年11月24日周二 上午10:49写道:
> >
> > > [image: image.png]
> > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 Benchao Li
你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。

zilong xiao  于2020年11月24日周二 上午10:49写道:

> [image: image.png]
> 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
>


-- 

Best,
Benchao Li


Re: Flink 1.11 SQL作业中调用UDTF 出现“No match found for function signature ”异常

2020-10-26 文章 Benchao Li
这个问题已经解决了,在1.11.1版本应该就已经修复了。
可以贴下具体的代码和异常栈,看下是不是还有其他问题,还是使用方式的问题。

tonychen  于2020年10月26日周一 下午6:49写道:

> 这个问题解决了吗?现在1.11.2仍然有这个问题,或者有什么临时解决方案,
> registerFunction已经不好使了,createTemporarySystemFunction 报错 No match found for
> function signature
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: flinksql 不支持 % 运算

2020-10-26 文章 Benchao Li
1.11的话通过配置是无法实现的。可以把这个pr[1] cherry-pick到1.11的分支上编译一下来实现1.11上使用%

[1] https://github.com/apache/flink/pull/12818

夜思流年梦  于2020年10月26日周一 下午4:16写道:

> flink 版本1.11
> 目前flink-sql 好像不支持取余运算,会报错:
> 比如:SELECT * FROM Orders WHERE a % 2 = 0
> Percent remainder '%' is not allowed under the current SQL conformance
> level
>
>
> 看了下flink 的issue ,已经有人碰到过了,说是要1.12版本修复
>
>
>
>
> 想问下:如果再1.11版本,flink-sql 要怎么操作才能支持 % 运算呢? 可以通过修改配置文件来实现么?比如flink-conf.yaml



-- 

Best,
Benchao Li


Re: 在使用hive catalog的情况下 json format 大小写问题

2020-10-21 文章 Benchao Li
现在的json format就是支持大小写区分的吧。可以提供下你的DDL和样例数据么?

王刚  于2020年10月21日周三 下午8:08写道:

> hi~ 各位大佬,
>
> 由于catalog是不支持大小写 如果kafka 的数据是json格式,且某些json的key区分大小写的,
>
> 这个时候建的在hive catalog里建的kafka表的某些json字段是取不到的
>
> flink 1.11有现成解决的方案可以解决这种问题不
>


-- 

Best,
Benchao Li


Re: Re: flink sql ddl 是否支持映射多层json

2020-10-21 文章 Benchao Li
嗯,道理是一样的。ROW/MAP/ARRAY这些本来就是嵌套类型,嵌套深度没有限制

Roc Marshal  于2020年10月21日周三 下午2:38写道:

> 如果是深度是三层以上也是类似的嵌套语法吗?或者说是其他的写法?
>
>
> 谢谢
>
> Best Roc.
>
>
>
>
>
> 在 2020-09-24 20:53:12,"Benchao Li"  写道:
> >这个情况现在是支持的,可以用类似于这种写法:
> >```SQL
> >CREATE TABLE MyTable (
> >  a11 INT,
> >  a12 VARCHAR,
> >  a13 ROW
> >) WITH (...)
> >```
> >
> >Roc Marshal  于2020年9月24日周四 下午7:54写道:
> >
> >> 请教个问题,flink sql 流模式链接kafka的时候,message格式是多层的json,怎么对某个深度大于1的字段进行映射呢?
> >> {
> >> "a11":1,
> >> "a12":"1",
> >> "a13":{
> >> "a21":1,
> >> "a22":1,
> >> "a23":"1"}
> >> }
> >>
> >>
> >> 比如像这样的格式,怎么将a2开头的字段进行映射呢?如果现有版本不支持这个特性的话,是否可以考虑对此功能进行支持?
> >>
> >>
> >> 谢谢
> >
> >
> >
> >--
> >
> >Best,
> >Benchao Li
>


-- 

Best,
Benchao Li


Re: flinkSQL1.11写出数据到jdbc fleld type do not match

2020-10-19 文章 Benchao Li
Sink
> default_catalog.default_database.cloud_behavior_sink do not match.
> Query schema: [operation: VARCHAR(2147483647), operation_channel:
> VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647),
> lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id:
> VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei:
> VARCHAR(2147483647), targets: ARRAY `value` VARCHAR(2147483647)>>, product_name: VARCHAR(2147483647),
> product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647),
> platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647),
> languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para:
> MAP]
> Sink schema: [operation: VARCHAR(2147483647), operation_channel:
> VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647),
> lng:
> VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id:
> VARCHAR(2147483647)]
> at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:100)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
> at scala.Option.map(Option.scala:146)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
> at
>
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:97)
> at
>
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:72)
> at
>
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:53)
> at
>
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:24)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 11 more
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: SQL interval join 问题

2020-10-19 文章 Benchao Li
Hi Mic,

感谢关注这个issue,这个issue当前还在讨论中。
我认为问题已经定位清楚了,抄送了其他的committer同学进一步讨论确认。

Mic  于2020年10月19日周一 下午3:51写道:

> 搜了一下,目前是有一个 issue 看起来相关,https://issues.apache.org/jira/browse/FLINK-18996
> 不知道处理进度如何?
> 在 2020-10-19 15:03:54,"Mic"  写道:
> >现有 SQL 语句如下:
> >create table source1(
> >  id varchar PRIMARY KEY,
> >  a varchar,
> >  proctime AS PROCTIME()
> >) with (
> >'connector' = 'kafka'
> >...
> >);
> >create table source2(
> >  id varchar PRIMARY KEY,
> >  a varchar,
> >  proctime AS PROCTIME()
> >) with (
> >'connector' = 'kafka'
> >...
> >);
> >select
> >  case
> >when s1.id is not null then s1.id
> >else s2.id
> >  end as ids,
> >  s1.a, s2.b
> >from source1 as s1 full outer join source2 as s2 on s1.id = s2.id where
> s1.proctime between s2.proctime - INTERVAL '5' SECOND and s2.proctime +
> INTERVAL '5' SECOND;
> >
> >
> >最后的 join 语句预期是 如果两个source的消息, 先后到达时间超过 10 秒,则输出,
>  两条消息。
> >
> >
> >目前的观察结果是,如果两条消息, 先后到达时间超过10 秒,输出为:, 
> >为何超过 10 秒后,仍然会输出  ?
>


-- 

Best,
Benchao Li


Re: 回复: flink 自定义udf注册后不能使用

2020-10-18 文章 Benchao Li
Hi,
当前可以理解Flink注册UDF有三种类型:
- TEMPORARY SYSTEM FUNCTION
- TEMPORARY CATALOG FUNCTION
- CATALOG FUNCTION

加上内置的SYSTEM FUNCTION
可以认为一共有四种,他们的解析顺序为:
1. TEMPORARY SYSTEM FUNCTION
2. SYSTEM FUNCTION
3. TEMPORARY CATALOG FUNCTION
4. CATALOG FUNCTION

所以你观察到TEMPORARY SYSTEM FUNCTION会覆盖内置函数,但是TEMPORARY CATALOG FUNCTION不会覆盖
这个现象是没有问题的。


amen...@163.com  于2020年10月16日周五 下午3:46写道:

> 是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug
>
> best,
> amenhub
>
> 发件人: 史 正超
> 发送时间: 2020-10-16 15:26
> 收件人: user-zh@flink.apache.org
> 主题: 回复: 回复:回复: flink 自定义udf注册后不能使用
> 你这样创建试一下,或者换个名字试试
>
> CREATE TEMPORARY SYSTEM  FUNCTION imei_encrypt AS
> 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA;
>
> 我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY
> SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以
>
> 
> 发件人: 奔跑的小飞袁 
> 发送时间: 2020年10月16日 6:47
> 收件人: user-zh@flink.apache.org 
> 主题: Re: 回复:回复: flink 自定义udf注册后不能使用
>
> 是的,是我传参有问题
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: flinksql如何控制结果输出的频率

2020-10-14 文章 Benchao Li
可以具体描述下你的问题么,没太看懂你的问题。

smallwong  于2020年10月14日周三 下午6:57写道:

> 哈喽,请问是做了什么调整?才10秒的窗口,期待每秒都输出结果的
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li


Re: flink-SQL1.11版本对map类型中value的空指针异常

2020-10-13 文章 Benchao Li
是的,所以应该用createNullableConverter,而不是createConverter

史 正超  于2020年10月14日周三 上午10:45写道:

>
> 但是 方法上有这样的一个注释:Creates a runtime converter which assuming input object is
> not null.
> 代码这样写的前提是,不允许对象的值为null的。
> ____
> 发件人: Benchao Li 
> 发送时间: 2020年10月14日 2:34
> 收件人: user-zh 
> 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常
>
> 嗯,这应该是一个实现的bug,可以提个issue修复一下~
>
> 史 正超  于2020年10月14日周三 上午10:19写道:
>
> > 从你的异常来看,你用的format是 avro, 我看了下源码,他对varchar类型的covert和json不一样,avro的代码是这样的:
> >
> > case CHAR:
> > case VARCHAR:
> >return avroObject -> StringData.fromString(avroObject.toString());
> >
> > 所以,你的map类型的value值为null,会报空指针异常的。
> > 
> > 发件人: 奔跑的小飞袁 
> > 发送时间: 2020年10月14日 1:46
> > 收件人: user-zh@flink.apache.org 
> > 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常
> >
> > other_para MAP
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 

Best,
Benchao Li


Re: flink-SQL1.11版本对map类型中value的空指针异常

2020-10-13 文章 Benchao Li
嗯,这应该是一个实现的bug,可以提个issue修复一下~

史 正超  于2020年10月14日周三 上午10:19写道:

> 从你的异常来看,你用的format是 avro, 我看了下源码,他对varchar类型的covert和json不一样,avro的代码是这样的:
>
> case CHAR:
> case VARCHAR:
>return avroObject -> StringData.fromString(avroObject.toString());
>
> 所以,你的map类型的value值为null,会报空指针异常的。
> 
> 发件人: 奔跑的小飞袁 
> 发送时间: 2020年10月14日 1:46
> 收件人: user-zh@flink.apache.org 
> 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常
>
> other_para MAP
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: flink-SQL1.11版本对map类型中value的空指针异常

2020-10-13 文章 Benchao Li
ro-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
> ~[flink-avro-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[lexus-flink_2.11-0.1.jar:?]
> at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[lexus-flink_2.11-0.1.jar:?]
> at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[lexus-flink_2.11-0.1.jar:?]
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[lexus-flink_2.11-0.1.jar:?]
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> Caused by: java.lang.NullPointerException
> at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:253)
> ~[flink-avro-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315)
> ~[flink-avro-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:222)
> ~[flink-avro-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:207)
> ~[flink-avro-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:149)
> ~[flink-avro-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
> ~[flink-avro-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[lexus-flink_2.11-0.1.jar:?]
> at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[lexus-flink_2.11-0.1.jar:?]
> at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[lexus-flink_2.11-0.1.jar:?]
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[lexus-flink_2.11-0.1.jar:?]
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> private static DeserializationRuntimeConverter
> createMapConverter(LogicalType type) {
> final DeserializationRuntimeConverter keyConverter =
> createConverter(
> DataTypes.STRING().getLogicalType());
> final DeserializationRuntimeConverter valueConverter =
> createConverter(
> extractValueTypeToAvroMap(type));
> return avroObject -> {
> final Map map = (Map) avroObject;
> Map result = new HashMap<>();
> for (Map.Entry entry : map.entrySet()) {
> Object key =
> keyConverter.convert(entry.getKey());
> Object value =
> valueConverter.convert(entry.getValue());
> result.put(key, value);
> }
> return new GenericMapData(result);
> };
> }
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: Flink的table-api不支持.

2020-10-09 文章 Benchao Li
user是关键字,需要用`user`来处理一下~

Kyle Zhang  于2020年10月9日周五 上午8:34写道:

> 试一试select *  from  OrderA orderA join OrderB orderB on
> orderA.user=orderB.user
>
> On Sun, Oct 4, 2020 at 5:09 PM 忝忝向仧 <153488...@qq.com> wrote:
>
> > Hi,all:
> >
> >
> > Table api的sql查询里面join的时候不能写"."么?
> > 这样写就会报错 如下
> > Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> > SQL parse failed. Encountered "." at line 1, column 36.
> > Was expecting one of:
> >>   "EXCEPT" ...
> >   "FETCH" ...
> >   "FROM" ...
> >   "INTERSECT" ...
> >   "LIMIT" ...
> >   "OFFSET" ...
> >   "ORDER" ...
> >   "MINUS" ...
> >   "UNION" ...
> >   "," ...
> >
> >
> >
> > Table result = tEnv.sqlQuery("select *  from  OrderA join OrderB on
> > OrderA.user=OrderB.user");
>


-- 

Best,
Benchao Li


Re: Re: sql-cli执行sql报错

2020-09-29 文章 Benchao Li
这个错误看起来比较奇怪。正常来讲flink-sql-connector-kafka_2.11-1.10.2.jar里面应该都是shaded之后的class了,
但是却报了一个非shaded的ByteArrayDeserializer。
我感觉这个应该是你自己添加了一下比较特殊的逻辑导致的。可以介绍下你对kafka connector做了哪些改造么?

hl9...@126.com  于2020年9月28日周一 下午6:06写道:

> 按照您的方法重试了下,又报了另一个错误:
> Flink SQL> CREATE TABLE tx (
> > account_id  BIGINT,
> > amount  BIGINT,
> > transaction_time TIMESTAMP(3),
> > WATERMARK FOR transaction_time AS transaction_time -
> INTERVAL '5' SECOND
> > ) WITH (
> > 'connector.type' = 'kafka',
> > 'connector.version' = 'universal',
> > 'connector.topic' = 'heli01',
> > 'connector.properties.group.id' = 'heli-test',
> > 'connector.properties.bootstrap.servers' = '
> 10.100.51.56:9092',
> > 'connector.startup-mode' = 'earliest-offset',
> > 'format.type'= 'csv'
> > );
> [INFO] Table has been created.
>
> Flink SQL> show tables ;
> tx
>
> Flink SQL> select * from tx ;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.serialization.ByteArrayDeserializer is not an
> instance of
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer
>
> 附:lib包清单
> [test@rcx51101 lib]$ pwd
> /opt/flink-1.10.2/lib
>
> flink-csv-1.10.2.jar
> flink-dist_2.12-1.10.2.jar
> flink-jdbc_2.12-1.10.2.jar
> flink-json-1.10.2.jar
> flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
> flink-sql-connector-kafka_2.11-1.10.2.jar
> flink-table_2.12-1.10.2.jar
> flink-table-blink_2.12-1.10.2.jar
> log4j-1.2.17.jar
> mysql-connector-java-5.1.48.jar
> slf4j-log4j12-1.7.15.jar
>
>
>
>
> hl9...@126.com
>
> 发件人: Leonard Xu
> 发送时间: 2020-09-28 16:36
> 收件人: user-zh
> 主题: Re: sql-cli执行sql报错
> Hi
> benchao的回复是的对的,
> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
>
>
> > 相关lib包:
> > flink-connector-kafka_2.12-1.10.2.jar
> > kafka-clients-0.11.0.3.jar
>
> 祝好
> Leonard
>


-- 

Best,
Benchao Li


Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

2020-09-29 文章 Benchao Li
这个问题的原因应该是你的kafka partition数量应该是大于1的,并且不是所有partition都有数据导致的。
你可以检查下你的kafka topic。
目前来讲,只要你的每个kafka 的partition都有数据,那么watermark应该是可以正常产生的。跟并行度无关。

Asahi Lee <978466...@qq.com> 于2020年9月27日周日 下午6:05写道:

> 你好!
>   我使用flink
> sql,从kafka中读取数据,然后进行sql聚合操作,然后再输出到kafka中;当我设置并行度为1时,程序执行正常;当我设置并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢?
>   使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度?



-- 

Best,
Benchao Li


Re: 使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

2020-09-29 文章 Benchao Li
你的timeout方法应该要正确的处理ResultFuture,
比如ResultFuture.complete或者completeExceptionally,如果你什么都没做,那么这个异步请求就还没有真的结束。

王敏超  于2020年9月29日周二 下午5:43写道:

>  AsyncDataStream
>   //顺序异步IO
>   .orderedWait(input, new AsyncDatabaseRequest(), 5000,
> TimeUnit.MILLISECONDS, 1000)
>
>   当我没重写timeout方法的时候,会执行这个报错信息
> resultFuture.completeExceptionally(new TimeoutException("Async function
> call
> has timed out."))
>
>
>   当我重写了timeout方法,如下,程序就卡住了,求大佬解答。
>   override def timeout(input: String, resultFuture: ResultFuture[Int]):
> Unit
> = {
> println("time out ... ")
>   }
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: 回复: BLinkPlanner sql join状态清理

2020-09-29 文章 Benchao Li
Hi Ericliuk,

这应该是实现的bug,你可以去社区建一个issue描述下这个问题。
有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~

Ericliuk  于2020年9月29日周二 下午4:59写道:

> 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
> <
> http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png>
>
>
> 不太清楚为什么用了mini batch就没读取这个配置。
> 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li


Re: 如何在流式数据源上使用分析函数LAG和EAD函数

2020-09-29 文章 Benchao Li
Hi Robin,

目前LAG/LEAD函数在流式场景下的实现的确是有bug的,那个实现只能在批式场景下work,
是线上其实没有考虑流式的场景。所以你看到的结果应该是它只能返回当前数据。
这个问题我也是最近才发现的,刚刚建了一个issue[1] 来跟踪这个问题。
当前如果你想实现类似功能,可以先自己写一个udaf来做。

[1] https://issues.apache.org/jira/browse/FLINK-19449

Robin Zhang  于2020年9月29日周二 下午2:04写道:

> 环境: flink 1.10,使用flinkSQL
>
> kafka输入数据如:
> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0}
> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0}
> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0}
> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0}
> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}
> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}
>
> sql如下:
>
> INSERT INTO topic_sink
> SELECT
>   t,
>   id,
>   speed,
>   LAG(speed, 1) OVER w AS speed_1,
>   LAG(speed, 2) OVER w AS speed_2
> FROM topic_source
> WINDOW w AS (
>   PARTITION BY id
>   ORDER BY t
> )
> 我期望得到的结果数据是
> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null,
> "speed_2":null}
> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0,
> "speed_2":null}
> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0,
> "speed_2":1.0}
> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0,
> "speed_2":2.0}
> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0,
> "speed_2":3.0}
> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0,
> "speed_2":4.0}
>
> 实际得到的结果数据是:
> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0,
> "speed_2":1.0}
> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0,
> "speed_2":2.0}
> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0,
> "speed_2":3.0}
> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0,
> "speed_2":4.0}
> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0,
> "speed_2":5.0}
> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0,
> "speed_2":6.0}
>
> 想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: sql-cli执行sql报错

2020-09-28 文章 Benchao Li
(1) 的方式相当于一个shade之后的包,会把所有compile的依赖都打进去。
(2) 的方式的话,你需要自己手工添加所有这个connector的依赖,比如你提到的kafka-clients。
而且,kafka-clients本身的依赖如果你没有打到kafka-clients这个包里面的话,那你也需要把
那些compile依赖也都放进来。所以相当于手工做了一遍maven的依赖处理,而且要想全部都
放进来,应该会有很多。

如果你对kafka-clients有修改,建议自己重新依赖自己修改后的kafka-clients打包一个kafka-sql-connector-kafka

赵一旦  于2020年9月28日周一 下午5:51写道:

>
> 看了下pom,在flink-sql-connector-kafka中依赖了flink-connector-kafka-**,该包又依赖了flink-connector-kafka-base-**以及kafka-client。
> 然后flink-sql-connector-kafka做了shade。
>
> 所以看下来,我的那个(1)和(2)理论上效果是一样的。
> 
>
> 顺便讲下,我kafka-clients更换了ssl证书读取方式,用于支持hdfs等分布式协议(直接复用了flink-core中的filesystem实现)。
>
> 赵一旦  于2020年9月28日周一 下午5:42写道:
>
> >
> 这个不是很懂,(1)flink-connector-kafka_2.11-1.11.2.jar+flink-connector-kafka-base_2.11-1.11.2.jar+kafka-clients-0.11.0.3.jar
> > 和(2)flink-sql-connector-kafka**.jar是啥区别呢?
> >
> > 使用(1)可以不?因为我的kafka-clients部分是调整了源码的。
> >
> > Leonard Xu  于2020年9月28日周一 下午4:36写道:
> >
> >> Hi
> >> benchao的回复是的对的,
> >> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
> >> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
> >>
> >>
> >> > 相关lib包:
> >> > flink-connector-kafka_2.12-1.10.2.jar
> >> > kafka-clients-0.11.0.3.jar
> >>
> >> 祝好
> >> Leonard
> >
> >
>


-- 

Best,
Benchao Li


Re: 回复:sql-cli执行sql报错

2020-09-27 文章 Benchao Li
kafka的依赖应该是依赖shaded之后的版本,也就是flink-*sql*-connector-kafka***.jar

hl9...@126.com  于2020年9月28日周一 上午10:29写道:

> 确实语法不对。我用了1.10的语法后,执行sql又报了另外一个错误:
> Flink SQL> select * from tx ;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
>
> 相关的lib依赖包如下:
> [root@rcx51101 lib]# pwd
> /opt/flink-1.10.2/lib
> [root@rcx51101 lib]# ll | grep kafka
> -rw-rw-r-- 1 test test 26169 Sep 28 10:21
> flink-connector-kafka-0.10_2.11-1.10.2.jar
> -rw-rw-r-- 1 test test 54969 Sep 28 10:21
> flink-connector-kafka-0.11_2.11-1.10.2.jar
> -rw-rw-r-- 1 test test 37642 Sep 28 10:21
> flink-connector-kafka-0.9_2.11-1.10.2.jar
> -rw-rw-r-- 1 test test 81912 Aug 17 16:41
> flink-connector-kafka_2.12-1.10.2.jar
> -rw-rw-r-- 1 test test106632 Sep 28 10:22
> flink-connector-kafka-base_2.11-1.10.2.jar
> -rw-rw-r-- 1 test test106632 Aug 17 16:36
> flink-connector-kafka-base_2.12-1.10.2.jar
> -rw-rw-r-- 1 test test   1893564 Jul 24  2018 kafka-clients-2.0.0.jar
>
>
>
> hl9...@126.com
>
> 发件人: 111
> 发送时间: 2020-09-28 09:23
> 收件人: user-zh@flink.apache.org
> 主题: 回复:sql-cli执行sql报错
> 你貌似使用的是flink-1.11的语法。
> 可以修改成flink-1.10的语法试试,参考文档:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>
>
> | |
> xinghalo
> |
> |
> xingh...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年09月28日 09:16,hl9...@126.com 写道:
> flink版本1.10.2,问题重现如下,请问各位大佬是什么原因:
>
> ./sql-client.sh  embedded
> Flink SQL> show tables ;
> [INFO] Result was empty.
>
> Flink SQL> CREATE TABLE tx (
> account_id  BIGINT,
> amount  BIGINT,
> transaction_time TIMESTAMP(3),
> WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'heli01',
> 'properties.bootstrap.servers' = '10.100.51.56:9092',
> 'format'= 'csv'
> );
> [INFO] Table has been created.
>
> Flink SQL> select * from tx ;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
> a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
> Reason: Required context properties mismatch.
>
> The following properties are requested:
> connector=kafka
> format=csv
> properties.bootstrap.servers=10.100.51.56:9092
> schema.0.data-type=BIGINT
> schema.0.name=account_id
> schema.1.data-type=BIGINT
> schema.1.name=amount
> schema.2.data-type=TIMESTAMP(3)
> schema.2.name=transaction_time
> schema.watermark.0.rowtime=transaction_time
> schema.watermark.0.strategy.data-type=TIMESTAMP(3)
> schema.watermark.0.strategy.expr=`transaction_time` - INTERVAL '5' SECOND
> topic=heli01
>
> The following factories have been considered:
> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>
>
>
> hl9...@126.com
>


-- 

Best,
Benchao Li


Re: Flink SQL 1.11.1 executeSql/SqlUpdate时 SQL validation的一些问题

2020-09-27 文章 Benchao Li
我理解这个是calcite的机制导致的。calcite并不知道Flink一共有多少层schema,其实Flink
自己抽象了三层,也就是catalog.database.table
但是配置CalciteCatalogReader的时候,需要配置一些默认的schema查找规则,这个Flink是配置了两个,
也就是默认的catalog 和 默认的catalog+默认的database
然后calcite在查找的时候会先尝试default_catalog.default_database作为schema,去查找a.b,此时会先把a当做table去查找,并且找不到。
接下来会default_catalog作为schema去查找a.b,此时就找到了。

刘首维  于2020年9月25日周五 下午3:41写道:

> Hi all,
>
>
>  今天在调试1.11 Flink 代码的时候,发现一个没太理解的现象
>
>
>   考虑以下code
>
>
>
>   bsTableEnv.executeSql("create database a")
> bsTableEnv.executeSql( " CREATE TABLE  a.b "(后略))
> bsTableEnv.executeSql("select * from a.b")
>
>
> 然后发现了以下现象:
>
>
> 从图中可以得知,在`DatabaseCalciteSchema` 中
> 我发现下面几个奇怪的点
>
>1.  databaseName 是 ‘'default'
>2.  getTable将 `a`作为参数传入,而不是b (a是库名,b是表名)
>
>
>
> 首先可以确定的是这个发生在validation阶段
>
> 其次我发现特意针对这块做了一次catch `TableNotExistException`的操作
>
> 请问这部分代码的用途和目的是?
>
>
>
>

-- 

Best,
Benchao Li


Re: 回复:flink sql count问题

2020-09-27 文章 Benchao Li
Hi,

试试用这种方式呢:count(1) filter (where name like '南京%')

anonnius  于2020年9月27日周日 下午5:29写道:

> select count(nullif(if(name not like '南京%', '其他', '南京'), '其他'))
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-09-27 17:23:07,"zya"  写道:
> >你好,链接无法显示,能麻烦再贴下吗
> >
> >
> >--原始邮件--
> >发件人:
>   "user-zh"
> <
> anonn...@126.com;
> >发送时间:2020年9月27日(星期天) 下午5:20
> >收件人:"user-zh" >
> >主题:Re:回复:flink sql count问题
> >
> >
> >
> >hi: '其他', '南京'), '其他'))
> >在 2020-09-27 17:07:39,"zya"  >貌似只能这样了,感谢回答
> >
> >
> >
> >
> >--nbsp;原始邮件nbsp;--
> >发件人:
> "user-zh"
>  >发送时间:nbsp;2020年9月27日(星期天) 下午5:03
> >收件人:nbsp;"user-zh" >
> >主题:nbsp;Re:回复:flink sql count问题
> >
> >
> >
> >你count 也会生成记录啊。 你过滤掉就行nbsp;nbsp; 。 比如 having xxxnbsp;
> ,或者加个filter
> >在 2020-09-27 17:01:06,"zya"  >gt;这个是我现在的做法,但是的问题就是使用sum会在条件没满足时也会在mysql中生成一条记录
> >gt;amp;nbsp;
> >gt;
> >gt;
> >gt;
> >gt;
>
> >gt;--amp;nbsp;原始邮件amp;nbsp;--
> >gt;发件人:nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> "user-zh"nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  >gt;发送时间:amp;nbsp;2020年9月27日(星期天) 下午4:59
> >gt;收件人:amp;nbsp;"user-zh" amp;gt;;
> >gt;
> >gt;主题:amp;nbsp;Re:flink sql count问题
> >gt;
> >gt;
> >gt;
> >gt;最好把null 变成0,amp;nbsp; 你这样amp;nbsp;amp;nbsp;
> sum(if(name like '南京%',1 , 0))
> >gt;在 2020-09-27 16:53:56,"zya"  写道:
> >gt;amp;gt;请教各位:
> >gt;amp;gt;我有一个sql任务需要进行count,在count中有一个表达式,只想count符合条件的记录,
> >gt;amp;gt;之前在hive中是这么写的:count(if(name like '南京%',1 ,
> null)),但是flink sql中count不能为null,有什么别的方法能实现该功能吗?
> >gt;amp;gt;使用的是flink1.10.1 blink
> >gt;amp;gt;amp;amp;nbsp;
>


-- 

Best,
Benchao Li


Re: Re: FlinkSQL是否支持设置窗口trigger实现continuious trigger呢

2020-09-27 文章 Benchao Li
可以的。有一个实验性质的Fast Emit功能,可以通过如下参数开启:
table.exec.emit.early-fire.enabled = true
table.exec.emit.early-fire.delay = 10s

Michael Ran  于2020年9月27日周日 下午2:49写道:

> 额,不是5分钟窗口,10秒一个步长往前滑动吗? 我以为滚动是5分钟窗口 5分钟一输出呢。。
> 在 2020-09-27 14:43:57,"赵一旦"  写道:
> >不是滑动窗口哈。是滚动窗口,每10秒触发一次输出。滑动窗口的化逻辑就变了。
> >
> >Michael Ran  于2020年9月27日周日 下午2:39写道:
> >
> >> 滑动窗口
> >> 在 2020-09-27 13:25:37,"赵一旦"  写道:
> >> >如题,不使用DatastreamAPI,使用FlinkSQL能否实现五分钟窗口,每10秒输出一次呢?
> >>
>


-- 

Best,
Benchao Li


Re: 关于flink sql的数据类型

2020-09-25 文章 Benchao Li
1.11的话,
string类型是允许:"a":"abc" 和 "a": 123这两种形式的
bigint类型的话:"a": 123 和 "a": "123"也都是合法的

默认如果是字段不存在,会用null来表示;
如果字段解析错误,会抛异常,如果配置了ignoreParseError,则会忽略整条数据。

不知道你上面提到的(1)是怎么测出来的,方便把具体的DDL定义和示例数据贴一下吗?

赵一旦  于2020年9月25日周五 下午2:52写道:

> 我基于1.11测试的。目前来看,json format的2个设置都设置好。然后event
> time部分使用COALESCE将null情况设置为event_time 0。这么做是最好的情况啦。
>
> Benchao Li  于2020年9月25日周五 下午2:08写道:
>
> > 你用的是哪个版本?1.11版本应该是改进过这块,不应该出现这个情况。
> >
> > 赵一旦  于2020年9月25日周五 上午11:02写道:
> >
> > > 而且按照string无法接受"a":a,bigint在 "t":"as"情况下会为null。
> > > 这么来看,bigint反而比string还通用了,可以将非法数据通过null录入进来。
> > > string方式反而丢失部分信息了还。
> > >
> > > 赵一旦  于2020年9月25日周五 上午10:57写道:
> > >
> > > > 今天做个测试,发现一些类型的特点,想确认下。
> > > >
> > > > 目前来看,kafka数据的2个配置,(1)不存在字段设置null(2)解析错误忽略。
> > > >
> > > >
> > > > 发现如下几个特征
> > > > (1)顶层字段字符串情况,实际数据为 "a": "a" 合法,"a":12不合法。
> > > > (2)非顶层字段,比如d map,d中的字段 "b": 12则是合法的。
> > > > (3)t字段为bigint类型,并且由此衍生了eventtime。
> > > >  如果数据为 t: abc 则数据直接非法被忽略。
> > > >  如果数据为t: "abc",则t被转为null?
> > > > 当然eventtime本身还有个不可null的限制,我通过COALESCE解决了。
> > > >
> > > >
> > > > 想知道有没有什么规则,尽可能避免任务失败的。因为数据一旦有一点异常导致失败就会很麻烦。
> > > >
> > > > 比如那个忽略错误,实际是无法解决event time为null的情况的这种错误的。
> > > > 我是通过COALESCE解决的。
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: 关于flink sql的数据类型

2020-09-25 文章 Benchao Li
你用的是哪个版本?1.11版本应该是改进过这块,不应该出现这个情况。

赵一旦  于2020年9月25日周五 上午11:02写道:

> 而且按照string无法接受"a":a,bigint在 "t":"as"情况下会为null。
> 这么来看,bigint反而比string还通用了,可以将非法数据通过null录入进来。
> string方式反而丢失部分信息了还。
>
> 赵一旦  于2020年9月25日周五 上午10:57写道:
>
> > 今天做个测试,发现一些类型的特点,想确认下。
> >
> > 目前来看,kafka数据的2个配置,(1)不存在字段设置null(2)解析错误忽略。
> >
> >
> > 发现如下几个特征
> > (1)顶层字段字符串情况,实际数据为 "a": "a" 合法,"a":12不合法。
> > (2)非顶层字段,比如d map,d中的字段 "b": 12则是合法的。
> > (3)t字段为bigint类型,并且由此衍生了eventtime。
> >  如果数据为 t: abc 则数据直接非法被忽略。
> >  如果数据为t: "abc",则t被转为null?
> > 当然eventtime本身还有个不可null的限制,我通过COALESCE解决了。
> >
> >
> > 想知道有没有什么规则,尽可能避免任务失败的。因为数据一旦有一点异常导致失败就会很麻烦。
> >
> > 比如那个忽略错误,实际是无法解决event time为null的情况的这种错误的。
> > 我是通过COALESCE解决的。
> >
>


-- 

Best,
Benchao Li


Re: flink sql ddl 是否支持映射多层json

2020-09-24 文章 Benchao Li
这个情况现在是支持的,可以用类似于这种写法:
```SQL
CREATE TABLE MyTable (
  a11 INT,
  a12 VARCHAR,
  a13 ROW
) WITH (...)
```

Roc Marshal  于2020年9月24日周四 下午7:54写道:

> 请教个问题,flink sql 流模式链接kafka的时候,message格式是多层的json,怎么对某个深度大于1的字段进行映射呢?
> {
> "a11":1,
> "a12":"1",
> "a13":{
> "a21":1,
> "a22":1,
> "a23":"1"}
> }
>
>
> 比如像这样的格式,怎么将a2开头的字段进行映射呢?如果现有版本不支持这个特性的话,是否可以考虑对此功能进行支持?
>
>
> 谢谢



-- 

Best,
Benchao Li


Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

2020-09-24 文章 Benchao Li
我们一般提升作业吞吐能力的步骤就是看作业的反压情况,
- 如果作业完全没有反压,说明此时处理能力大于上游数据产生速度
- 如果作业有反压,就具体看下反压的是哪个算子,存在什么瓶颈。比如网络IO、磁盘IO、CPU;
  当然,有时候内存问题也会表现为CPU现象,比如GC比较严重

范超  于2020年9月24日周四 上午10:48写道:

> 谢谢Benchao哥回复。
>
> 这几天一直忙着压测这个问题。
> 经多轮压测(先灌满kafka数据),再去消费。
> 发现确实是您说的问题中的第三个情况
> 由于kafka的topic只开了一个partition
>
> 所以flinkkafkaconsumer按照一个taskmanger对应了一个kafka的parition的方式进行了处理。从而导致虽然作业并发度够大,但是由于只有一个partition,
> 其他并发的taskmanager无法获取到更多的partition进行消费,从而导致并行度提升而作业消费能力却无法同比增大。
>
> 之后通过建立2个partition的topic,实现了消费能力的翻倍。
>
>
> 想再请多问您一句,我如果想压出作业的极限吞吐量,请问该如何设置一些运行参数,目前我通过设置on yarn
> 的tm的内存大小,kafka的partition数目,也无法将作业的吞吐量压上去。
>
>
>
> -邮件原件-
> 发件人: Benchao Li [mailto:libenc...@apache.org]
> 发送时间: 2020年9月18日 星期五 18:49
> 收件人: user-zh 
> 主题: Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以
>
> 提交两个作业的话,两个作业是完全独立的,都会消费全量数据。
>
> 一个作业的消费能力不行,可以具体看下瓶颈在哪里,比如:
> 1. 作业是否有lag,如果没有lag,那其实是没有问题的
> 2. 如果作业有lag,而且lag还在上涨,说明当前消费能力不足,此时可以看下作业具体的瓶颈在哪里
> 有可能是某个算子在反压导致整个作业的消费能力不足
> 也有可能是作业的整体CPU资源不足导致的
> 也有一种极端情况是,作业的并发度已经足够大,source subtask已经对应一个kafka
> partition了,但是消费能力还是不足,这个时候其实是单个partition数据量太大,对应到Flink的source算子处理能力不足导致的
> 3. 如果作业当前有lag,但是lag在下降,说明消费能力其实是够的,只是数据有些积压
>
> 范超  于2020年9月18日周五 下午4:07写道:
>
> > 各位好,我遇到了一个奇怪的问题
> >
> > 我是使用flink1.10和 flink-connector-kafka_2.11
> >
> > 使用Flink on yarn 模式运行,无论怎么调大并行度。Kafka节点(我使用的单节点)的网卡输出速度一直上不去。
> >
> > 但是提交两个同样的应用同样使用FLink on Yarm模式,Kafka节点的网卡输出速度是正常翻倍的。
> >
> > 我想达到的目的不是通过多向yarn集群提交多一个app,而是通过设置并行度来提高应用的吞吐量。。
> >
> > 求各位大佬指导
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 

Best,
Benchao Li


Re: flink sql延迟数据

2020-09-24 文章 Benchao Li
这个目前还不能,但是在1.12是可以的,已经在这个issue[1] 中添加了这个功能

[1] https://issues.apache.org/jira/browse/FLINK-18555

ang <806040...@qq.com> 于2020年9月24日周四 上午11:19写道:

> 感谢benchao,请问下这部分只能通过config来设置吗,有没有可以直接在sql中设置的配置项
>
>
> 
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> libenc...@apache.org;
> 发送时间:2020年9月23日(星期三) 下午5:22
> 收件人:"user-zh"
> 主题:Re: flink sql延迟数据
>
>
>
> 你是用的Blink planner的TUMBLE window么,如果是的话,可以通过设置state
> retention[1]时间来处理late数据的。
> 具体的allow lateness的时间就是你设置的min retention time
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
> ang <806040...@qq.com 于2020年9月23日周三 下午4:24写道:
>
>  hi各位,有个问题请教一下:
>  我现在使用flink sql统计一下kafka中在某个时间窗口内指定字段出现的次数,使用event
>  time,需要在5s内输出结果,但是数据会有一些延迟,可能大于5s,目前设置waterwark为
>  WATERMARK FOR ts AS tsnbsp; - INTERVAL '5' SECODND
>  ,但是这样延迟大于5s的数据就会被丢弃掉,请问下其他延迟的数据有没有什么办法进行处理?我看datastream
> api里面可以使用allowed
>  lateness,但是这部分在sql中没看到有相关语法
> 
> 
>  Flink版本1.10.1
>  nbsp;
>
>
>
> --
>
> Best,
> Benchao Li



-- 

Best,
Benchao Li


Re: [DISCUSS] Move Hive document to "Table & SQL Connectors" from "Table API & SQL"

2020-09-24 文章 Benchao Li
+1

nashcen <2415370...@qq.com> 于2020年9月24日周四 下午1:09写道:

> +1
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: flink sql延迟数据

2020-09-23 文章 Benchao Li
你是用的Blink planner的TUMBLE window么,如果是的话,可以通过设置state retention[1]时间来处理late数据的。
具体的allow lateness的时间就是你设置的min retention time

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

ang <806040...@qq.com> 于2020年9月23日周三 下午4:24写道:

> hi各位,有个问题请教一下:
> 我现在使用flink sql统计一下kafka中在某个时间窗口内指定字段出现的次数,使用event
> time,需要在5s内输出结果,但是数据会有一些延迟,可能大于5s,目前设置waterwark为
> WATERMARK FOR ts AS ts - INTERVAL '5' SECODND
> ,但是这样延迟大于5s的数据就会被丢弃掉,请问下其他延迟的数据有没有什么办法进行处理?我看datastream api里面可以使用allowed
> lateness,但是这部分在sql中没看到有相关语法
>
>
> Flink版本1.10.1
> 



-- 

Best,
Benchao Li


Re: flink pb转json性能问题

2020-09-23 文章 Benchao Li
Hi kandy,

关于第1个问题,目前社区有计划做一个内置的pb format[1],可能大概率赶不上1.12了,不过应该1.13差不多。

[1] https://issues.apache.org/jira/browse/FLINK-18202

kandy.wang  于2020年9月23日周三 下午4:55写道:

> 因flink目前不支持pb format,调用了,protobuf-java-util
> com.google.protobuf.utilJsonFormat.printer().preservingProtoFieldNames().print(message)
> 先再pb 转成json 再套用 JsonRowDataDeserializationSchema处理json,
> 发现处理的性能就只能达到20w左右的tps,而如果是处理json格式的数据,tps是可以达到50-60w的tps.
> 想问一下,1、flink要是处理pb格式的数据,有什么好的办法? 2
> 、社区对pb format 会支持么?
> 3、pb转json 有什么性能比较好的工具包



-- 

Best,
Benchao Li


Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 Benchao Li
超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。

郑斌斌  于2020年9月23日周三 下午12:29写道:

>  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN
> KILL 。
> 单流跑的话,比较正常。
> JOB的内存是4G。版本1.11.1
> ------
> 发件人:Benchao Li 
> 发送时间:2020年9月23日(星期三) 10:50
> 收件人:user-zh 
> 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
>
> Hi Tianwang,
>
> 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
>
> 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
> join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
> `Math.max(leftRelativeSize, rightRelativeSize) +
> allowedLateness`,根据你的SQL,这个值应该是6h
> 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
> 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
> 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
> `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
> 2;`,在你的SQL来讲,就是3h,也就是说
> 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]
>
> 希望这个可以解答你的疑惑~
>
> [1] https://issues.apache.org/jira/browse/FLINK-18996
>
> Tianwang Li  于2020年9月22日周二 下午8:26写道:
>
> > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
> >
> >
> > 【join】
> >
> > > SELECT `b`.`rowtime`,
> > > `a`.`c_id`,
> > > `b`.`openid`
> > > FROM `test_table_a` AS `a`
> > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > > AND `a`.`openid` = `b`.`openid`
> > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0'
> SECOND
> > > AND `a`.`rowtime` + INTERVAL '6' HOUR
> > >
> > >
> > 【window】
> >
> > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR)
> AS
> > > `rowtime`,
> > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__windoow_start__`,
> > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > `__window_end__`,
> > > `c_id`,
> > > COUNT(`openid`) AS `cnt`
> > > FROM `test_table_in_6h`
> > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > > `c_id`
> > >
> >
> >
> > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
> >
> > 【配置】
> >
> > > cat conf/flink-conf.yaml
> > > jobmanager.rpc.address: flink-jobmanager
> > > taskmanager.numberOfTaskSlots: 1
> > > blob.server.port: 6124
> > > jobmanager.rpc.port: 6123
> > > taskmanager.rpc.port: 6122
> > > jobmanager.heap.size: 6144m
> > > taskmanager.memory.process.size: 4g
> > > taskmanager.memory.jvm-overhead.min: 1024m
> > > taskmanager.memory.jvm-overhead.max: 2048m
> > > taskmanager.debug.memory.log-interval: 1
> > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> > -XX:NumberOfGCLogFiles=10
> > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> > >
> >
> >
> >
> > --
> > **
> >  tivanli
> > **
> >
>
>
> --
>
> Best,
> Benchao Li
>
>

-- 

Best,
Benchao Li


Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 Benchao Li
Hi Tianwang,

不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
`Math.max(leftRelativeSize, rightRelativeSize) +
allowedLateness`,根据你的SQL,这个值应该是6h
2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
`minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
2;`,在你的SQL来讲,就是3h,也就是说
状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

希望这个可以解答你的疑惑~

[1] https://issues.apache.org/jira/browse/FLINK-18996

Tianwang Li  于2020年9月22日周二 下午8:26写道:

> 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
>
>
> 【join】
>
> > SELECT `b`.`rowtime`,
> > `a`.`c_id`,
> > `b`.`openid`
> > FROM `test_table_a` AS `a`
> > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > AND `a`.`openid` = `b`.`openid`
> > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND
> > AND `a`.`rowtime` + INTERVAL '6' HOUR
> >
> >
> 【window】
>
> > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `rowtime`,
> > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__windoow_start__`,
> > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > `__window_end__`,
> > `c_id`,
> > COUNT(`openid`) AS `cnt`
> > FROM `test_table_in_6h`
> > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > `c_id`
> >
>
>
> 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
>
> 【配置】
>
> > cat conf/flink-conf.yaml
> > jobmanager.rpc.address: flink-jobmanager
> > taskmanager.numberOfTaskSlots: 1
> > blob.server.port: 6124
> > jobmanager.rpc.port: 6123
> > taskmanager.rpc.port: 6122
> > jobmanager.heap.size: 6144m
> > taskmanager.memory.process.size: 4g
> > taskmanager.memory.jvm-overhead.min: 1024m
> > taskmanager.memory.jvm-overhead.max: 2048m
> > taskmanager.debug.memory.log-interval: 1
> > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> -XX:NumberOfGCLogFiles=10
> > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> >
>
>
>
> --
> **
>  tivanli
> **
>


-- 

Best,
Benchao Li


Re: 消费kafka source反压

2020-09-21 文章 Benchao Li
这个性能影响指的是跟那种情况进行对比呢?

smq <374060...@qq.com> 于2020年9月21日周一 下午6:49写道:

> 谢谢,多问一句,并行度为1的话,keyby算子加上keydstate对性能影响大吗
>
>
>
> ---原始邮件---
> 发件人: "Benchao Li" 发送时间: 2020年9月21日(周一) 下午4:39
> 收件人: "user-zh" 主题: Re: 消费kafka source反压
>
>
> 这种反压一般是下游反压过来的,可以检查下最后一个反压的算子,那个才是处理能力的瓶颈。
>
> smq <374060...@qq.com 于2020年9月21日周一 下午2:08写道:
>
> 
> 
> 大家好,在测试flink消费速率时,发现数据处理比较慢,大概一个task每秒处理1000条左右,经过查看UI界面,发现读取kafka数据源这块source反压达到1,请问有这方面经验吗?
>
>
>
> --
>
> Best,
> Benchao Li



-- 

Best,
Benchao Li


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 Benchao Li
可以通过SQL的where条件来过滤吧

chuyuan  于2020年9月21日周一 下午6:48写道:

> 好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 Benchao Li
 libMap = (Map)
> JSON.parse(pointDO.getLib());
> }
> this.setLib(libMap);
> }
> }
>
> 第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下:
> "CREATE TABLE test.test(" +
> "  type STRING," +
> "  lib MAP,"
> +
> "  properties
> MAP" +
> ") PARTITIONED BY (" +
> "  dt string" +
> " ) stored as orcfile " +
> " TBLPROPERTIES" +
>     " (" +
>
> "'partition.time-extractor.kind'='custom'," +
>
> "'partition.time-extractor.timestamp-pattern'='$dt'," +
>
>
> "'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor',"
> +
>
> "'sink.partition-commit.trigger'='partition-time'," +
>
> "'sink.partition-commit.delay'='0s'," +
>
> "'sink.partition-commit.policy.kind'='metastore'" +
> ")");
>
>
> 第三步,把临时表的数据insert into到目标表,此时出现异常:
> org.apache.flink.table.api.TableException: A raw type backed by type
> information has no serializable string representation. It needs to be
> resolved into a proper raw type.
>
> 然后打印临时表的数据结构,发现lib和properties在临时表中数据结构被解析为:
>  |-- lib: LEGACY('RAW', 'ANY')
>  |-- properties: LEGACY('RAW', 'ANY')
>  |-- track_id: BIGINT
>  |-- type: STRING
> ,这说明lib LEGACY('RAW', 'ANY')无法匹配hive目标表中lib
> MAP数据结构,写入失败,大概流程是这样。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: 消费kafka source反压

2020-09-21 文章 Benchao Li
这种反压一般是下游反压过来的,可以检查下最后一个反压的算子,那个才是处理能力的瓶颈。

smq <374060...@qq.com> 于2020年9月21日周一 下午2:08写道:

>
> 大家好,在测试flink消费速率时,发现数据处理比较慢,大概一个task每秒处理1000条左右,经过查看UI界面,发现读取kafka数据源这块source反压达到1,请问有这方面经验吗?



-- 

Best,
Benchao Li


Re: Re: [SQL] parse table name from sql statement

2020-09-21 文章 Benchao Li
我感觉可以先把SQL转成RelNode,然后用Calcite的visitor模式的RelShuttle来获取?

Harold.Miao  于2020年9月21日周一 下午1:58写道:

> 主要是我没有完整的所有单元case, 总是感觉写的不完整。
>
> 郭士榕  于2020年9月21日周一 上午11:08写道:
>
> >
> >
> >
> > 就是要一个一个判断做解析下推的,比如你举的SqlJoin例子, 然后继续left,right下推。
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-09-21 10:50:31,"Harold.Miao"  写道:
> > >主要是嵌套回溯特别复杂, 例如getFrom之后后面可能又是嵌套一个SqlJoin等等类似情况太多。 还有要做很多的类型转换。
> > >
> > >郭士榕  于2020年9月21日周一 上午10:21写道:
> > >
> > >> 可以使用calcite。解析kind为CREATE_TABLE的语句,解析INSERT,下推from的表。
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> 在 2020-09-21 10:12:13,"Harold.Miao"  写道:
> > >> >hi all
> > >> >
> > >> >请教大家在复杂sql语句中parse所有的table name是怎么实现的。
> > >> >
> > >> >谢谢
> > >> >
> > >> >--
> > >> >
> > >> >Best Regards,
> > >> >Harold Miao
> > >>
> > >
> > >
> > >--
> > >
> > >Best Regards,
> > >Harold Miao
> >
>
>
> --
>
> Best Regards,
> Harold Miao
>


-- 

Best,
Benchao Li


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 Benchao Li
Hi chuyuan,

可以详细描述下你遇到的问题么,比如下面这些信息
- 用的是哪个Flink版本
- SQL(包括DDL和query)
- 数据是什么样子的

chuyuan  于2020年9月21日周一 下午2:40写道:

>  LEGACY('RAW',
> 'ANY')对应sql中数据类型改为:MAP,仍然报错,异常:
> org.apache.flink.table.api.TableException: A raw type backed by type
> information has no serializable string representation. It needs to be
> resolved into a proper raw type.
> 方便说下具体实现细节吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

2020-09-18 文章 Benchao Li
提交两个作业的话,两个作业是完全独立的,都会消费全量数据。

一个作业的消费能力不行,可以具体看下瓶颈在哪里,比如:
1. 作业是否有lag,如果没有lag,那其实是没有问题的
2. 如果作业有lag,而且lag还在上涨,说明当前消费能力不足,此时可以看下作业具体的瓶颈在哪里
有可能是某个算子在反压导致整个作业的消费能力不足
也有可能是作业的整体CPU资源不足导致的
也有一种极端情况是,作业的并发度已经足够大,source subtask已经对应一个kafka
partition了,但是消费能力还是不足,这个时候其实是单个partition数据量太大,对应到Flink的source算子处理能力不足导致的
3. 如果作业当前有lag,但是lag在下降,说明消费能力其实是够的,只是数据有些积压

范超  于2020年9月18日周五 下午4:07写道:

> 各位好,我遇到了一个奇怪的问题
>
> 我是使用flink1.10和 flink-connector-kafka_2.11
>
> 使用Flink on yarn 模式运行,无论怎么调大并行度。Kafka节点(我使用的单节点)的网卡输出速度一直上不去。
>
> 但是提交两个同样的应用同样使用FLink on Yarm模式,Kafka节点的网卡输出速度是正常翻倍的。
>
> 我想达到的目的不是通过多向yarn集群提交多一个app,而是通过设置并行度来提高应用的吞吐量。。
>
> 求各位大佬指导
>


-- 

Best,
Benchao Li


Re: 关于flinksql 滑动窗口数据进不来的问题

2020-09-14 文章 Benchao Li
那看起来就是watermark的问题了。你可以在Flink web UI上查看一下对应的算子的watermark是否符合预期。

有一个小tip,watermark本身是由数据来驱动更新的。比如你只有一条数据,那么你的watermark就只能是根据
这条数据计算出来的,不会自动再更新。

李杨烨 <438106...@qq.com> 于2020年9月14日周一 下午5:27写道:

> 在进入stream之前是有数据的,使用hop方法计算之后就没有数据流出了。
>
>
> 水印的设置代码如下:
> simpleResults.assignTimestampsAndWatermarks(WatermarkStrategy
> . .withTimestampAssigner((event,
> timestamp)-event.getGmtPaidLong())
> .withIdleness(Duration.ofSeconds(5)));
> ---
> 另外 刚刚我用了processTime做窗口滑动是可以实现的,但是processTime对业务不友好,因此如果根据rowTime可以做是最好的。
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> libenc...@apache.org;
> 发送时间:2020年9月14日(星期一) 下午5:19
> 收件人:"user-zh"
> 主题:Re: 关于flinksql 滑动窗口数据进不来的问题
>
>
>
> 可以再详细一点描述下问题么,滑动窗口数据进不来,指的是窗口没有触发计算还是数据就没有到窗口呢?
>
> 如果只是窗口没有触发计算,一般用了row time的话,可以排查下watermark是否有正常生成。
>
> 李杨烨 <438106...@qq.com 于2020年9月14日周一 下午1:32写道:
>
>  刚刚邮件图片挂了,上传了新的图片地址:
> http://chuantu.xyz/t6/741/1600061331x-1224481926.jpg
>  使用rowTime做的滑动
>
>
>
> --
>
> Best,
> Benchao Li



-- 

Best,
Benchao Li


Re: 关于flinksql 滑动窗口数据进不来的问题

2020-09-14 文章 Benchao Li
可以再详细一点描述下问题么,滑动窗口数据进不来,指的是窗口没有触发计算还是数据就没有到窗口呢?

如果只是窗口没有触发计算,一般用了row time的话,可以排查下watermark是否有正常生成。

李杨烨 <438106...@qq.com> 于2020年9月14日周一 下午1:32写道:

> 刚刚邮件图片挂了,上传了新的图片地址:http://chuantu.xyz/t6/741/1600061331x-1224481926.jpg
> 使用rowTime做的滑动



-- 

Best,
Benchao Li


Re: 关于使用flinksql 生成滑动窗口 table数据进不来的问题

2020-09-13 文章 Benchao Li
你好,你的图片挂了,可以把图片放到第三方图床工具然后把链接发出来。或者直接用文本描述的问题。

李杨烨 <438106...@qq.com> 于2020年9月14日周一 上午11:25写道:

>
> 根据rowTime做的滑动



-- 

Best,
Benchao Li


Re: UDAF函数在over窗口使用问题

2020-09-13 文章 Benchao Li
Hi,

看起来你并没有实现`retract` 方法,正常来讲,over window在处理过期数据的时候,会将过期的数据进行一次retract计算。
所以你需要正确的实现一下retract方法。

chen310 <1...@163.com> 于2020年9月14日周一 上午10:01写道:

> flink版本 1.11.1
>
> 实现了一个UDAF聚集函数,将窗口内某些字段合并成一个字符串。代码如下:
>
> public class AggDistinctDetail extends AggregateFunction AggDistinctDetail.Details> {
> private static final Logger logger =
> LoggerFactory.getLogger(AggDistinctDetail.class);
>
> public static class Details {
> public Set set;
> }
>
> @Override
> public Details createAccumulator() {
> return new Details();
> }
>
> @Override
> public String getValue(Details acc) {
> return JSON.toJSONString(acc.set);
> }
>
> public void accumulate(Details acc, String val) {
> if (acc.set == null) {
> acc.set = new HashSet<>();
> }
> acc.set.add(val);
> }
>
> public void retract(Details acc, String val) {
> //now, agg detail don't need support retraction
> }
>
> public void merge(Details acc, Iterable it) {
> Iterator iter = it.iterator();
> if (acc.set == null) {
> acc.set = new HashSet<>();
> }
> while (iter.hasNext()) {
> Details a = iter.next();
> acc.set.addAll(a.set);
> }
> }
>
> public void resetAccumulator(Details acc) {
> acc.set = null;
> }
> }
>
> 将此UDAF使用在over窗口上,此窗口按realIp分区,以消息中事件时间(EventTime)
> requestDateTime向前推24小时作为窗口,统计窗口内realIp对应的所有userId,作为明细输出userId聚集后的字符串。
>
> drop function if exists UDF_InfoDistinctMerge;
> create function UDF_InfoDistinctMerge AS
> 'com.binance.risk.flink.udf.AggDistinctDetail';
>
> select
> realIp ,
> UDF_InfoDistinctMerge(userId) over w1 as userSet
> from source_table
> window w1 as (partition by realIp order by requestDateTime asc RANGE
> BETWEEN
> INTERVAL '24' hour preceding AND CURRENT ROW) ;
>
> 实际测试下来,发现聚集后的字符串userSet是一直在增长,即使窗口时间已经超过24小时,依然被聚集到userSet这个结果中,这和预期不符。
>
> 问题:
> 是上面UDAF的实现有啥问题么?还是UDAF在over窗口上有bug?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-10 文章 Benchao Li
sql 算子内部会自动处理这些状态。 这个状态只是聚合的中间结果,并不需要保留原始数据。
当然这个聚合的中间结果状态,也可以指定state retention time来清理一些过期的状态。

last_value只是一个聚合函数,没啥特殊的地方,而且只是按照处理时间获取最后一条数据的聚合函数。

lec ssmi  于2020年9月10日周四 下午2:35写道:

> 上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗?
> 感觉底层和 last_value() group by id是一样的。
>
> Benchao Li  于2020年9月10日周四 上午10:34写道:
>
> >
> >
> 1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。
> > 如果还有自己的binlog格式,也可以自定义format来实现。
> >
> > 只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可以认为
> > 1. append / update_after 消息会累加到聚合指标上
> > 2. delete / update_before 消息会从聚合指标上进行retract
> >
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/canal.html
> > [2]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html
> >
> > 忝忝向仧 <153488...@qq.com> 于2020年9月9日周三 下午10:54写道:
> >
> > > 请问第1点是有实际的案例使用了么?
> > > 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录?
> > > 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的
> > > 谢谢.
> > >
> > >
> > >
> > >
> > > --原始邮件--
> > > 发件人:
> > >   "user-zh"
> > > <
> > > libenc...@apache.org;
> > > 发送时间:2020年9月9日(星期三) 中午1:09
> > > 收件人:"user-zh" > >
> > > 主题:Re: flink实时统计GMV,如果订单金额下午变了该怎么处理
> > >
> > >
> > >
> > > 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩:
> > > 1. 首先版本是1.11+, 可以直接用binlog
> > > format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink
> > >  内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by
> > > yyy这种,那这个sum指标会自动做好这件事。
> > > 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1]
> > 将append数据流转成retract数据流,这样下游再用同样的
> > >  聚合逻辑,效果也是一样的。
> > >
> > > [1]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
> > >
> > >
> > > xuzh  > >
> > >  场景:
> > >  nbsp; nbsp;实时统计每天的GMV,但是订单金额是会修改的。
> > >  nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。
> > >  nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka
> ,GMV实时统计为1000.
> > >  nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka.
> > >  这时如果不减去上午已经统计的金额。那么总金额就是错的。nbsp;nbsp;
> > >  请问是不是根据 update /delete
> > 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。
> > > 
> > > 
> > >  nbsp; 刚入坑实时处理,请大神赐教
> > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-09 文章 Benchao Li
1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。
如果还有自己的binlog格式,也可以自定义format来实现。

只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可以认为
1. append / update_after 消息会累加到聚合指标上
2. delete / update_before 消息会从聚合指标上进行retract


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/canal.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html

忝忝向仧 <153488...@qq.com> 于2020年9月9日周三 下午10:54写道:

> 请问第1点是有实际的案例使用了么?
> 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录?
> 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的
> 谢谢.
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> libenc...@apache.org;
> 发送时间:2020年9月9日(星期三) 中午1:09
> 收件人:"user-zh"
> 主题:Re: flink实时统计GMV,如果订单金额下午变了该怎么处理
>
>
>
> 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩:
> 1. 首先版本是1.11+, 可以直接用binlog
> format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink
>  内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by
> yyy这种,那这个sum指标会自动做好这件事。
> 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] 将append数据流转成retract数据流,这样下游再用同样的
>  聚合逻辑,效果也是一样的。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>
>
> xuzh 
>  场景:
>  nbsp; nbsp;实时统计每天的GMV,但是订单金额是会修改的。
>  nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。
>  nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000.
>  nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka.
>  这时如果不减去上午已经统计的金额。那么总金额就是错的。nbsp;nbsp;
>  请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。
> 
> 
>  nbsp; 刚入坑实时处理,请大神赐教
>
>
>
> --
>
> Best,
> Benchao Li



-- 

Best,
Benchao Li


Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-08 文章 Benchao Li
不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩:
1. 首先版本是1.11+, 可以直接用binlog
format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink
  内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by
yyy这种,那这个sum指标会自动做好这件事。
2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] 将append数据流转成retract数据流,这样下游再用同样的
  聚合逻辑,效果也是一样的。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication


xuzh  于2020年9月8日周二 下午5:56写道:

> 场景:
>  实时统计每天的GMV,但是订单金额是会修改的。
>  订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。
>  假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000.
>  然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka.
> 这时如果不减去上午已经统计的金额。那么总金额就是错的。
> 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。
>
>
>  刚入坑实时处理,请大神赐教



-- 

Best,
Benchao Li


Re: how flink-sql connector kafka reads array json

2020-09-07 文章 Benchao Li
Hi,

这个是一个已知的问题,已经有issue[1] 在跟进解决了。预计在1.12可以使用。

[1] https://issues.apache.org/jira/browse/FLINK-18590

大罗  于2020年9月8日周二 上午10:39写道:

> hi,大家好,我遇到一个问题。
>
> 下游系统发过来的数据是json数组,比如[{"name": "daluo", "age": 1}, {"name": "xiaoming",
> "age": 2}],我想使用'connector.type' = 'kafka' 阅读此类数据,应该如何写如下的sql?
>
> CREATE TABLE mykafka1 (name String, age Int)
> WITH (
>'connector.type' = 'kafka',
>'format.type' = 'json',
>'update-mode' = 'append'
> );
>
>
> 还是说,先使用原生的FlinkKafkaConsumer读取变成DataStream>,再转换flatMap转换成DataStream,再使用tableEnv.fromDataStream把它变成tableSource?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: FlinkSQL如何处理上游的表结构变更

2020-09-05 文章 Benchao Li
我理解SQL本身都是强类型的,要处理这种schema会变更的情况可能本身就不是很合适。

CC 云邪,不知道云邪大佬对这个有没有更好的想法。

忝忝向仧 <153488...@qq.com> 于2020年9月5日周六 下午4:50写道:

> 同问如果是上游表结构变更没有及时通知到下游,数据同步这块就会报错
> 有没什么办法解决?
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> flin...@163.com;
> 发送时间:2020年9月4日(星期五) 下午2:18
> 收件人:"user-zh"
> 主题:FlinkSQL如何处理上游的表结构变更
>
>
>
> Hi all:
> flink version : 1.11.0
> 场景:上游的数据来自binlog,当发生表结构变更时,希望能够实时的变动flink内部表的schema,但是目前来看,表的schema都是create
> table时写死的,有什么办法可以处理这种场景呢



-- 

Best,
Benchao Li


Re: 回复:请指教一个关于时间窗的问题,非常感谢!

2020-09-03 文章 Benchao Li
+
> "from (select appid,eventid,count(*) as cnt," +
> "TUMBLE_START(rowtime,INTERVAL '1' HOUR)  as starttime," +
> "TUMBLE_END(rowtime,INTERVAL '1' HOUR)  as endtime  " +
> "from log  group by appid,eventid,TUMBLE(rowtime,INTERVAL '1'
> HOUR),TIME '00:00:00')";
>
>
> Table table = tenv.sqlQuery(sql);
> DataStream dataStream = tenv.toAppendStream(table, Result.class);
> dataStream.print();
>
> env.execute("etl.exception.monitor.ExceptionAlertHour");
> }
>
>
>
> public static class Result{
> private String appid;
> private String eventid;
> private long cnt;
> private Timestamp stime;
> private Timestamp etime;
> public String getAppid() {
> return appid;
> }
>
> public void setAppid(String appid) {
> this.appid = appid;
> }
>
> public String getEventid() {
> return eventid;
> }
>
> public void setEventid(String eventid) {
> this.eventid = eventid;
> }
>
> public long getCnt() {
> return cnt;
> }
>
> public void setCnt(long cnt) {
> this.cnt = cnt;
> }
>
>
> public Timestamp getStime(){
> return stime;
> }
>
> public void setStime(Timestamp stime){
> this.stime = stime;
> }
>
> public Timestamp getEtime(){
> return etime;
> }
>
> public void setEtime(Timestamp etime){
> this.etime = etime;
> }
>
> @Override
> public String toString(){
> return "ResultHour{" +
>   "appid=" + appid +
>   ",eventid=" + eventid +
>   ",cnt=" + cnt +
>   ", stime=" + stime +
>   ", etime=" + etime +
>   ", SystemTime=" + System.currentTimeMillis() +
>   '}';
> }
> }
>
> }
>
>
> 发件人: jacky-cui
> 发送时间: 2020-09-02 18:58
> 收件人: user-zh
> 主题: 回复:请指教一个关于时间窗的问题,非常感谢!
> 你这个flink是什么版本,能贴全代码吗
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> samuel@ubtrobot.com;
> 发送时间:2020年9月2日(星期三) 下午3:20
> 收件人:"user-zh"
> 主题:请指教一个关于时间窗的问题,非常感谢!
>
>
>
> 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
> 
> //指定eventtime字段及生成watermark
> DataStream withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
> WatermarkStrategy
>
> . //. .withIdleness(Duration.ofSeconds(10)) //即时没数据时,也生成watermark
> .withTimestampAssigner((event, timestamp)-event.f3));
>
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> tenv.registerDataStream(
> "log",
> withTimestampsAndWatermarksDS,
> "appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
>
> String sql = "select appid,eventid,cnt," +
> 
> "(starttime + interval '8' hour ) as stime," +
> 
> "(endtime + interval '8' hour ) as etime "
> +
>  "from
> (select appid,eventid,count(*) as cnt," +
> 
> "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," +
> 
> "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " +
>  "from
> log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME
> '00:00:00')"; //希望整点结束时触发时间窗关闭
>
> Table table = tenv.sqlQuery(sql);
> DataStream Result.class);
>
> 输出的结果是:
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。
> (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39
> 等到这条数据上来后才触发
> ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01
> 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481
> //2020/9/2 15:23:35}
> 请问一下哪里出了问题?万分感谢!
>


-- 

Best,
Benchao Li


Re: flink json ddl解析

2020-09-02 文章 Benchao Li
Hi,
如果声明为 ARRAY 是否可以满足你的需求呢?如果可以的话,你可以在
1.12之后使用这个feature[1].

[1] https://issues.apache.org/jira/browse/FLINK-18002

zilong xiao  于2020年9月1日周二 下午5:49写道:

> 问题大概懂了,坐等Flink大佬回复
>
> Dream-底限  于2020年9月1日周二 下午4:43写道:
>
> > hi
> > 就是json数组如果是这种:[1,2,3],我可以直接array解析
> >
> >
> 如果json数组是这种:[1,"test",true],如果我用array>程序是没办法运行的,如果我用array > int,b string,c boolean>>,flink做ddl翻译解析json的时候会把row > boolean>这一部分映射为解析jsonobject,但是array元素不是jsonobject会导致取不到数据
> >
> > zilong xiao  于2020年9月1日周二 下午4:04写道:
> >
> > > 基本类型包装一层会导致解析不出来  这个没太明白,可以举个列子吗?
> > >
> > > Dream-底限  于2020年9月1日周二 下午2:20写道:
> > >
> > > > hi、
> > > >
> > >
> >
> 我先前也想这样用,但后来发现ddl中的row对应json中的object,基本类型包装一层会导致解析不出来,感觉应该在ddl加一个类型映射一下这种情况
> > > >
> > > > zilong xiao  于2020年9月1日周二 上午11:47写道:
> > > >
> > > > > like this:  ARRAY > > > String>>>
> > > > >
> > > > > Dream-底限  于2020年9月1日周二 上午11:40写道:
> > > > >
> > > > > > hi
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 我正在解析json数组,在解析的时候遇到一个问题,当解析的json数组元素为同一类型的时候,我可以使用ddl的array进行存储,但是当json数组元素为不同类型的时候,我没办法做ddl映射,我查看JsonRowSchemaConverter解析json
> > > > > >
> array的时候,对于不同类型的数组元素解析后可以用row存储,但请问我在ddl时候要怎么做,因为在DDL用row表示数组会抛出异常
> > > > > >
> > > > > >
> > > > > > private static TypeInformation convertArray(String location,
> > > > > > JsonNode node, JsonNode root) {
> > > > > >// validate items
> > > > > >if (!node.has(ITEMS)) {
> > > > > >   throw new IllegalArgumentException(
> > > > > >  "Arrays must specify an '" + ITEMS + "' property in
> node:
> > "
> > > +
> > > > > > location);
> > > > > >}
> > > > > >final JsonNode items = node.get(ITEMS);
> > > > > >
> > > > > >// list (translated to object array)
> > > > > >if (items.isObject()) {
> > > > > >   final TypeInformation elementType = convertType(
> > > > > >  location + '/' + ITEMS,
> > > > > >  items,
> > > > > >  root);
> > > > > >   // result type might either be ObjectArrayTypeInfo or
> > > > > > BasicArrayTypeInfo for Strings
> > > > > >   return Types.OBJECT_ARRAY(elementType);
> > > > > >}
> > > > > >// tuple (translated to row)
> > > > > >else if (items.isArray()) {
> > > > > >   final TypeInformation[] types = convertTypes(location +
> > '/'
> > > +
> > > > > > ITEMS, items, root);
> > > > > >
> > > > > >   // validate that array does not contain additional items
> > > > > >   if (node.has(ADDITIONAL_ITEMS) &&
> > > > > > node.get(ADDITIONAL_ITEMS).isBoolean() &&
> > > > > > node.get(ADDITIONAL_ITEMS).asBoolean()) {
> > > > > >  throw new IllegalArgumentException(
> > > > > > "An array tuple must not allow additional items in
> > node:
> > > "
> > > > > > + location);
> > > > > >   }
> > > > > >
> > > > > >   return Types.ROW(types);
> > > > > >}
> > > > > >throw new IllegalArgumentException(
> > > > > >   "Invalid type for '" + ITEMS + "' property in node: " +
> > > > location);
> > > > > > }
> > > > > >
> > > > >
> > > >
> > >
> >
>


-- 

Best,
Benchao Li


Re: flink1.11时间函数

2020-08-28 文章 Benchao Li
不确定的意思是,这个函数的返回值是动态的,每次调用返回可能不同。
对应的是确定性函数,比如concat就是确定性函数,只要输入是一样的,它的返回值就永远都是一样的。
这个函数是否是确定性的,会影响plan的过程,比如是否可以做express reduce,是否可以复用表达式结果等。

Dream-底限  于2020年8月28日周五 下午2:50写道:

> hi
>
> UNIX_TIMESTAMP()
>
> NOW()
>
> 我这面想使用flink的时间戳函数,但是看官方文档对这两个函数描述后面加了一个此功能不确定,这个此功能不确定指的是这两个时间函数不能用吗
>


-- 

Best,
Benchao Li


Re: Flink 维表延迟join

2020-08-27 文章 Benchao Li
这种场景是不是可以直接用批的方式来处理呢?那就不需要维表了,正常join即可,
这样可以用到批里面一些特有的join优化。

魏烽  于2020年8月28日周五 上午9:58写道:

> 各位好:
>
>
> 现在有一个应用场景是使用流的方式读取hdfs文件进行处理(StreamEnv.readTextFile),实际可以看成是批处理,现需要进行维表join,维表不会变更,现有两种方案:
>
> 1.直接将维表一次性加载到内存进行join;
>
> 2.使用mysql或者hbase外部存储每条数据进行查询join;
>
> 但是方案一不能保证数据量一定可以全部加载到内存,方案二又需要额外的外部存储,增加了系统结构的复杂度
>
> 请问各位有什么更好的建议嘛?感谢
>
>  原始邮件
> 发件人: Leonard Xu
> 收件人: Jark Wu
> 抄送: user-zh; Benchao Li
> 发送时间: 2020年8月27日(周四) 20:11
> 主题: Re: Flink 维表延迟join
>
>
> 多谢 Jark 提议
>
> Issue[1] 建好了, 大家可以在issue下讨论。
>
> 祝好
> Leonard
> [1] https://issues.apache.org/jira/browse/FLINK-19063 <
> https://issues.apache.org/jira/browse/FLINK-19063>
>
>
> > 在 2020年8月27日,19:54,Jark Wu mailto:imj...@gmail.com>>
> 写道:
> >
> > @Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。
> >
> > On Thu, 27 Aug 2020 at 11:12, Leonard Xu  xbjt...@gmail.com> <mailto:xbjt...@gmail.com<mailto:xbjt...@gmail.com>>>
> wrote:
> >
> > Hi, all
> >
> > 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了,
> > 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗?
> >
> > Best
> > Leonard
> >
> > > 在 2020年8月27日,10:39,china_tao  taochangl...@163.com> <mailto:taochangl...@163.com taochangl...@163.com>>> 写道:
> > >
> > > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left
> > >
> join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。
> > > 个人推荐先用null存储,后期etl补录。
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/ <
> http://apache-flink.147419.n8.nabble.com/>
> >
>
>
>
>

-- 

Best,
Benchao Li


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 文章 Benchao Li
Congratulations Dian!

Cranmer, Danny  于2020年8月27日周四 下午10:55写道:

> Congratulations Dian! :D
>
> On 27/08/2020, 15:25, "Robert Metzger"  wrote:
>
> CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
> Congratulations Dian!
>
> On Thu, Aug 27, 2020 at 3:09 PM Congxian Qiu 
> wrote:
>
> > Congratulations Dian
> > Best,
> > Congxian
> >
> >
> > Xintong Song  于2020年8月27日周四 下午7:50写道:
> >
> > > Congratulations Dian~!
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Thu, Aug 27, 2020 at 7:42 PM Jark Wu  wrote:
> > >
> > > > Congratulations Dian!
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Thu, 27 Aug 2020 at 19:37, Leonard Xu 
> wrote:
> > > >
> > > > > Congrats, Dian!  Well deserved.
> > > > >
> > > > > Best
> > > > > Leonard
> > > > >
> > > > > > 在 2020年8月27日,19:34,Kurt Young  写道:
> > > > > >
> > > > > > Congratulations Dian!
> > > > > >
> > > > > > Best,
> > > > > > Kurt
> > > > > >
> > > > > >
> > > > > > On Thu, Aug 27, 2020 at 7:28 PM Rui Li <
> lirui.fu...@gmail.com>
> > > wrote:
> > > > > >
> > > > > >> Congratulations Dian!
> > > > > >>
> > > > > >> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei <
> yuanmei.w...@gmail.com>
> > > > > wrote:
> > > > > >>
> > > > > >>> Congrats!
> > > > > >>>
> > > > > >>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang <
> hxbks...@gmail.com
> > >
> > > > > wrote:
> > > > > >>>
> > > > > >>>> Congratulations Dian!
> > > > > >>>>
> > > > > >>>> Best,
> > > > > >>>> Xingbo
> > > > > >>>>
> > > > > >>>> jincheng sun  于2020年8月27日周四
> 下午5:24写道:
> > > > > >>>>
> > > > > >>>>> Hi all,
> > > > > >>>>>
> > > > > >>>>> On behalf of the Flink PMC, I'm happy to announce that
> Dian Fu
> > is
> > > > now
> > > > > >>>>> part of the Apache Flink Project Management Committee
> (PMC).
> > > > > >>>>>
> > > > > >>>>> Dian Fu has been very active on PyFlink component,
> working on
> > > > various
> > > > > >>>>> important features, such as the Python UDF and Pandas
> > > integration,
> > > > > and
> > > > > >>>>> keeps checking and voting for our releases, and also has
> > > > successfully
> > > > > >>>>> produced two releases(1.9.3&1.11.1) as RM, currently
> working as
> > > RM
> > > > > to push
> > > > > >>>>> forward the release of Flink 1.12.
> > > > > >>>>>
> > > > > >>>>> Please join me in congratulating Dian Fu for becoming a
> Flink
> > PMC
> > > > > >>>>> Member!
> > > > > >>>>>
> > > > > >>>>> Best,
> > > > > >>>>> Jincheng(on behalf of the Flink PMC)
> > > > > >>>>>
> > > > > >>>>
> > > > > >>
> > > > > >> --
> > > > > >> Best regards!
> > > > > >> Rui Li
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>
>

-- 

Best,
Benchao Li


Re: Flink 维表延迟join

2020-08-26 文章 Benchao Li
Hi,

我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。

其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。

郑斌斌  于2020年8月27日周四 上午9:23写道:

> 小伙伴们:
>
> 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。
> 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。
>
> Thanks



-- 

Best,
Benchao Li


Re: 关于sink失败 不消费kafka消息的处理

2020-08-26 文章 Benchao Li
Hi Eleanore,shizk233 同学给出的解释已经很全面了。

对于你后面提的这个问题,我感觉这个理解应该不太正确。
开了checkpoint之后,虽然kafka producer没有用两阶段提交,但是也可以保证在checkpoint成功的时候
会将当前的所有数据flush出去。如果flush失败,那应该是会导致checkpoint失败的。所以我理解这里应该是
at least once的语义,也就是数据可能会重复,但是不会丢。

Eleanore Jin  于2020年8月27日周四 上午9:53写道:

> Hi shizk233,
>
> 非常感谢你的回答! 如果是如下场景:我的DAG 就是从kafka source topic 读取数据, 然后写到kafka sink topic,
> 中间没有其他stateful operator. 如果sink operator 不是两端提交,就是kafka producer send,
> 那么如果开启checkpoint, state 就只是source operator kafka offset.
>
> 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
> 这个时候source operator 成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5, 假设是6.
> 假如这个时候publish message 4 失败了, 那么job restart from last successful checkpoint,
> source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗
>
> 谢谢!
> Eleanore
>
> On Wed, Aug 26, 2020 at 9:32 AM shizk233 
> wrote:
>
> > Hi Eleanore,这个问题我可以提供一点理解作为参考
> >
> > 1.chk与at least once
> > checkpoint机制的思想就是做全局一致性的快照,失败恢复时数据的消费位点会回滚到上一次chk n的进度,
> > 然后进行数据重放,这样就保证了数据不会缺失,至少被消费一次。
> >
> > 2. sink2PC
> > 在chk机制下,数据重放时一般sink端的数据不能回滚,就会有重复数据。如果是upsert sink那仍然是一致的,
> > 否则需要通过2PC的预提交来将chk n+1成功前的数据写到临时存储,等chk n+1完成再真正写入的物理存储。如果
> > 在chk n+1之前任务失败回滚了,那临时存储的数据也可以回滚,这样就能保证一致性。
> >
> > 这样的话 chk就是at least once,chk+upsert或者chk+2pc就是exactly once了。
> >
> > 3.kafka auto commit
> > chk快照的state不仅仅是source的offset,还有数据流中各个算子的state,chk机制会在整个数据流完成同一个chk
> > n的时候才提交offset。
> > kafka auto commit不能保障这种全局的一致性,因为auto commit是自动的,不会等待整个数据流上同一chk n的完成。
> >
> > Eleanore Jin  于2020年8月26日周三 下午11:51写道:
> >
> > > Hi Benchao
> > > 可以解释一下为什么sink没有两阶段提交,那就是at least once 的语义吗? 比如source和 sink 都是kafka, 如果
> > sink
> > > 不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto
> commit
> > > offset 看起来似乎没有什么区别
> > >
> > > 可否具体解释一下? 谢谢!
> > >
> > > Eleanore
> > >
> > > On Tue, Aug 25, 2020 at 9:59 PM Benchao Li 
> wrote:
> > >
> > > > 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。
> > > >
> > > > 范超  于2020年8月26日周三 上午11:38写道:
> > > >
> > > > > 大家好,我现在有个疑问
> > > > > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
> > > > >
> > > > >
> > > >
> > >
> >
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
> > > > >
> > > > >
> > > > > 多谢大家了
> > > > >
> > > > > 范超
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> >
>


-- 

Best,
Benchao Li


Re: 关于sink失败 不消费kafka消息的处理

2020-08-25 文章 Benchao Li
这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。

范超  于2020年8月26日周三 上午11:38写道:

> 大家好,我现在有个疑问
> 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
>
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
>
>
> 多谢大家了
>
> 范超
>


-- 

Best,
Benchao Li


Re: flink1.11 sql问题

2020-08-25 文章 Benchao Li
Hi,

这个功能已经在1.12支持了[1],如果着急使用,可以cherry-pick回去试试看。
用法就是直接把这个字段声明为varchar,json format会帮你自动处理

[1] https://issues.apache.org/jira/browse/FLINK-18002

酷酷的浑蛋  于2020年8月25日周二 下午6:32写道:

>
>
> 还没到udf那一步,直接用create table的方式,过来的数据就是获取不到值的,
> CREATE TABLE test (
> a VARCHAR,
> b INT
>  ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'test',
> 'properties.bootstrap.servers' = 'xxx',
> 'properties.group.id' = 'groupid',
> 'scan.startup.mode' = 'group-offsets',
> 'format'='json'
> );
>
>
>
>
> 在2020年08月25日 16:14,Jim Chen 写道:
> 这个需要你自定义UDF
>
> 酷酷的浑蛋  于2020年8月25日周二 下午3:46写道:
>
> 关键是那个值不是固定的,有时候是json,有时候是json数组,没办法固定写一个,现在我只想把value当做字符串获取到,难道没有办法吗
>
>
>
>
> 在2020年08月25日 15:34,taochanglian 写道:
> flinksql,处理json ,对象的话用row,数组的话用array获取具体的值。
>
> 在 2020/8/25 14:59, 酷酷的浑蛋 写道:
> 还是这个问题,如果字段的值有时候是json有时候是json数组,那么我只想把它当做字符串显示,该怎么写?
>
>
>
>
> 在2020年08月25日 14:05,酷酷的浑蛋 写道:
> 我知道了
>
>
>
>
> 在2020年08月25日 13:58,酷酷的浑蛋 写道:
>
>
>
>
> flink1.11
>
> 读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理?
>
>

-- 

Best,
Benchao Li


Re: Elasticsearch 写入问题

2020-08-23 文章 Benchao Li
你指的是用json format么?这个是json format的问题,当前的确是会把所有字段都写进去,不管是否为null。

我们内部也有类似需求,我们是修改了json format,可以允许忽略null的列。

 于2020年8月23日周日 下午6:11写道:

> 在flink1.11中使用table sql写入时,有一些字段为空时,依然被写入到elasticsearch,这个方式应该不太恰当。
>
>
>

-- 

Best,
Benchao Li


Re: 1.11版本,关于TableEnvironment.executeSql("insert into ..."),job名称设置的问题

2020-08-23 文章 Benchao Li
FYI: 有一个issue[1] 正在跟进和解决这个问题

[1] https://issues.apache.org/jira/browse/FLINK-18545

Zou Dan  于2020年8月23日周日 下午2:29写道:

> 据我所知,这种执行方式目前没法设置 jobName
>
> > 2020年8月21日 上午11:11,Asahi Lee <978466...@qq.com> 写道:
> >
> > 你好!
> >   我通过表环境执行insert into语句提交作业,我该如何设置我的job名称呢?
> >
> >
> > 程序:
> > EnvironmentSettings bbSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().build();
> > TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings);
> >
> > String sourceDDL = "CREATE TABLE datagen (  " +
> >" f_random INT,  " +
> >" f_random_str STRING,  " +
> >" ts AS localtimestamp,  " +
> >" WATERMARK FOR ts AS ts  " +
> >") WITH (  " +
> >" 'connector' = 'datagen',  " +
> >" 'rows-per-second'='10',  " +
> >" 'fields.f_random.min'='1',  " +
> >" 'fields.f_random.max'='5',  " +
> >" 'fields.f_random_str.length'='10'  " +
> >")";
> >
> > bsTableEnv.executeSql(sourceDDL);
> > Table datagen = bsTableEnv.from("datagen");
> >
> > System.out.println(datagen.getSchema());
> >
> > String sinkDDL = "CREATE TABLE print_table (" +
> >" f_random int," +
> >" c_val bigint, " +
> >" wStart TIMESTAMP(3) " +
> >") WITH ('connector' = 'print') ";
> > bsTableEnv.executeSql(sinkDDL);
> >
> > System.out.println(bsTableEnv.from("print_table").getSchema());
> >
> > Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str),
> TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by
> TUMBLE(ts, INTERVAL '5' second), f_random");
> > bsTableEnv.executeSql("insert into print_table select * from " + table);
>
>
>

-- 

Best,
Benchao Li


Re: Re: 如何设置FlinkSQL并行度

2020-08-21 文章 Benchao Li
Hi forideal,

我在本地试了一下,没有复现你说的这个情况。
我看代码也没有这个逻辑,如果是没有分配到partition,应该是会阻塞住,而不是finish。
你这个测试用的是社区的版本么?还是有什么特殊的改动?

forideal  于2020年8月21日周五 下午11:43写道:

> Hi 赵一旦,
> 基础信息:使用 watermark for 语法设置watermark,Flink SQL,Blink planner,Flink 1.10.0
> 我最近做了一个实验,将Flink SQL 的并发设置为 kafka topic partition 的 2 倍,同时设置 idle 的时间为 10s。
> 这时,1.source 会有一半的partition 立马就 finished
> 2.下游的 workmark 变成了LONG的最大值
> 整个任务都无法正常运行了。
>
>
> Best forideal
>
>
>
>
> 在 2020-08-17 10:05:48,"Zhao,Yi(SEC)"  写道:
> >我这边才研究FlinkSQL没几天。不过按照目前了解,是不支持算子级别并行度设置的。
>
> >此外你说的checkpoint无法正常触发,我估计是因为barrier的问题,部分并行示例没有分区数据,导致没数据就可能导致。这个问题类似,可能无解。
> >
> >非要解决可以写代码,把souce部分不使用sql实现。
> >__
> >
> >在 2020/8/15 下午8:21,“forideal” 写入:
> >
> >Hi 赵一旦,
> >
> >
> >目前 Flink SQL 我这边使用也是无法指定各个算子的并行度。目前我这边遇到两个问题。
> >1.并行度超过 topic partition 的时候会造成资源浪费
> >2.并行度超过 topic partition 后,checkpoint 也无法正常触发了
> >
> >
> >Best forideal
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2020-08-14 12:03:32,"赵一旦"  写道:
> >>检查点呢,大多数用FlinkSQL的同学们,你们的任务是随时可运行那种吗,不是必须保证不可间断的准确性级别吗?
> >>
> >>Xingbo Huang  于2020年8月14日周五 下午12:01写道:
> >>
> >>> Hi,
> >>>
> >>> 关于并行度的问题,据我所知,目前Table API上还没法对每一个算子单独设置并行度
> >>>
> >>> Best,
> >>> Xingbo
> >>>
> >>> Zhao,Yi(SEC)  于2020年8月14日周五 上午10:49写道:
> >>>
> >>> >
> 并行度问题有人帮忙解答下吗,此外补充个相关问题,除了并行度,flink-sql情况下,能做检查点/保存点,并基于检查点/保存点重启sql任务吗。
> >>> >
> >>> > 发件人: "Zhao,Yi(SEC)" 
> >>> > 日期: 2020年8月13日 星期四 上午11:44
> >>> > 收件人: "user-zh@flink.apache.org" 
> >>> > 主题: 如何设置FlinkSQL并行度
> >>> >
> >>> > 看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。
> >>> > 如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢?
> >>> >
> >>> > 比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。
> >>> >
> >>> >
> >>>
> >
> >
>


-- 

Best,
Benchao Li


Re: JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗

2020-08-20 文章 Benchao Li
每秒1多条不算少了吧,如果还想再高一些,可以提高一下sink.buffer-flush.max-rows配置,默认是100

LittleFall <1578166...@qq.com> 于2020年8月20日周四 下午7:56写道:

> 这是我的代码,它仅仅把数据从 datagen source 写入到了 jdbc sink.
>
> ```java
> package main;
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
>
> public class Main {
>
> public static void main(String[] args) {
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(
> StreamExecutionEnvironment.getExecutionEnvironment(),
>
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> );
>
> tEnv.executeSql("CREATE TABLE gen_stuff (\n" +
> "\tstuff_id int,\n" +
> "\tstuff_base_id int,\n" +
> "\tstuff_name varchar(20)\n" +
> ") WITH (\n" +
> " 'connector' = 'datagen'," +
> "'rows-per-second'='1000'," +
> "'fields.stuff_id.kind'='sequence'," +
> "'fields.stuff_id.start'='1'," +
> "'fields.stuff_id.end'='1000'," +
> "'fields.stuff_name.length'='15'" +
> ")"
> );
> tEnv.executeSql("CREATE TABLE result_stuff (\n" +
> "\tstuff_id int,\n" +
> "\tstuff_base_id int,\n" +
> "\tstuff_name varchar(20)\n" +
> ") WITH (\n" +
> "\t'connector'  = 'jdbc',\n" +
> "\t'url'=
> 'jdbc:mysql://127.0.0.1:3306/test?rewritebatchedstatements=true',\n" +
> "\t'table-name' = 'result_stuff',\n" +
> "\t'username'   = 'root',\n" +
> "\t'password'   = ''\n" +
> ")"
> );
>
> tEnv.executeSql("insert into result_stuff select stuff_id,
> stuff_base_id, stuff_name from gen_stuff");
> }
> }
> ```
>
> 然而,mysql 每秒大约只多 1 条数据。如果按一条数据 20B 来算,写入速度是 200KB/s,这无法满足我的需求。。。
>
> 请问,是我哪里的配置有问题,还是有其它更好的写入数据库的方案,谢谢给出任何建议的人。
>
> 我使用的和 jdbc 有关的依赖如下:
>
> ```xml
> 
> org.apache.flink
>
> flink-connector-jdbc_${scala.binary.version}
> ${flink.version}
> 
> 
> mysql
> mysql-connector-java
> 8.0.21
> 
> ```
>
> (作为对比,在我的电脑上使用 datagen 生成数据,写入文件系统 sinker 的效率大约是 23MB/s)
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 文章 Benchao Li
不同的key应该用的是同一个state的实例,但是state下面会处理不同的key对应的state,也就是key对于用户来说是透明的。
比如你用一个MapState,那就是约等于每个key都有一个Map实例,不同key之间是独立的。

shizk233  于2020年8月20日周四 下午5:03写道:

> 谢谢大佬解答。
> 想再问一下,在并发1的情况下,一个算子里的MapState应该只有一个实例,
> 那么数据流上的不同key(不是map里的hash key)是怎么保证只获取到对应的那部分map state数据呢?
>
> 按我的理解,按key分流后,每个子流应该是只有自己的state,但从算子实例考虑,好像只有1个state实例存在。
>
> Benchao Li  于2020年8月20日周四 下午4:40写道:
>
> > Hi,
> >
> > 问题1&2 都不存在多线程的问题。Flink底层来保证这些方法都是在同一个线程串行执行的。
> >
> > shizk233  于2020年8月20日周四 下午2:22写道:
> >
> > > Hi all,
> > >
> > > 请教一下,KeyedCoProcessFunction比较特殊,有两个输入,对应两个ProcessElement方法。
> > > 问题1:
> > > 如果在这两个Process方法中都对同一个MapState进行修改,是否会存在资源竞争的关系?
> > > 还是这两个方法是顺序执行的?
> > >
> > > 问题2:
> > > 虽然有不同的key,但函数只有一个实例,其中的MapState应该也是一个实例,那么不同key下的
> > > Process过程是并发执行的还是顺序执行的,会竞争MapState资源吗?
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 文章 Benchao Li
Hi,

问题1&2 都不存在多线程的问题。Flink底层来保证这些方法都是在同一个线程串行执行的。

shizk233  于2020年8月20日周四 下午2:22写道:

> Hi all,
>
> 请教一下,KeyedCoProcessFunction比较特殊,有两个输入,对应两个ProcessElement方法。
> 问题1:
> 如果在这两个Process方法中都对同一个MapState进行修改,是否会存在资源竞争的关系?
> 还是这两个方法是顺序执行的?
>
> 问题2:
> 虽然有不同的key,但函数只有一个实例,其中的MapState应该也是一个实例,那么不同key下的
> Process过程是并发执行的还是顺序执行的,会竞争MapState资源吗?
>


-- 

Best,
Benchao Li


Re: flink 1.11 web ui请教

2020-08-19 文章 Benchao Li
Hi,

因为目前的维表Join实现本身是没有用shuffle的,也就是维表算子跟上面的算子的连接方式为forward。
其他的join,都是直接按照join的key进行hash的,所以跟前面的算子的链接方式为hash。

 于2020年8月19日周三 下午3:33写道:

> 版本:1.11
> 部署:standalone
>
> 数据从kafka写到kafka
> 1.提交的任务包含两个维表join和两条insert
> 语句,但是在中间的执行图,只有一个方框。其他有些join任务会有不同的框,用hash连线表示。这是什么原因?
>
> 2.底下的 records received等几个列都是0。怎么样才会统计?
>
>
>

-- 

Best,
Benchao Li


Re: flink 1.11 order by rowtime报错

2020-08-19 文章 Benchao Li
Hi 斌斌,

感觉你应该是遇到了一个已知的bug[1]

[1] https://issues.apache.org/jira/browse/FLINK-16827

郑斌斌  于2020年8月19日周三 下午1:20写道:

>
> 报下面的这个错误,并行度设置为1就没有问题了,不知道为什么
>
> java.lang.NullPointerExcpetion
>   at
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
>   at
> org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
>   at
> org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:101)
>
> --
> 发件人:china_tao 
> 发送时间:2020年8月19日(星期三) 00:17
> 收件人:user-zh 
> 主 题:Re: flink 1.11 order by rowtime报错
>
> 错误呢?没看到。把代码贴出来看一下,是不是processtime没有设置或者设置不对
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>

-- 

Best,
Benchao Li


Re: hive Streaming Reading 无法分组统计

2020-08-19 文章 Benchao Li
Hi,

你的hint的用法应该不太对,可以参考下文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/hints.html

18579099...@163.com <18579099...@163.com> 于2020年8月19日周三 上午12:17写道:

>   SELECT
> id,
>  count(1)
> FROM
>  hive_user_parquet
> GROUP BY
> id
> /*+ OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-08-18') */
>   通过分组统计好像是会报语法错误的,这是什么原因造成的呢
>
>
>
> 18579099...@163.com
>


-- 

Best,
Benchao Li


Re: Print SQL connector无法正常使用

2020-08-19 文章 Benchao Li
Hi,

看起来你用的hbase的配置还是老的配置,1.11中已经更新的新的connector配置选项了,
你可以尝试下用新版的connector配置[1]。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html

xiao cai  于2020年8月17日周一 上午11:52写道:

> Hi All:
> 目前使用flink sql的Print SQL connector,想要将查询的结果打印出来,结果报错:
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSinkFactory' in
> the classpath.
>
>
> 可以保证:HBase-connector是在lib包下存在的,是否我还需要在lib下添加什么依赖?
>
>
> 下面为执行的sql:
>
>
> CREATE TABLE dimension (
> rowKey STRING,
> cf ROW,
> tas BIGINT
> ) WITH (
> 'connector.type' = 'hbase',
> 'connector.version' = '1.4.3',
> 'connector.table-name' = ’test',
> 'connector.write.buffer-flush.max-rows' = '10',
> 'connector.zookeeper.quorum' = ‘IP:port',
> 'connector.zookeeper.znode.parent' = '/hbase',
> );
>
>
> CREATE TABLE print_table (
>  f0 STRING,
>  f1 INT,
>  f2 BIGINT,
>  f3 BIGINT
> ) WITH (
>  'connector' = 'print'
> );
>
>
> insert into print_table
> select rowKey, cf.age, cf.area, tas
> from dimension



-- 

Best,
Benchao Li


  1   2   3   4   >