Re: flinksql 经过优化后,group by字段少了
你引用的这个 calcite 的 issue[1] 是在 calcite-1.22.0 版本中就修复了的,Flink 应该从 1.11 版本开始就已经用的是这个 calcite 版本了。 所以你用的是哪个版本的 Flink 呢,感觉这个可能是另外一个问题。如果可以在当前最新的版本 1.19 中复现这个问题的话,可以建一个 issue 来报一个 bug。 PS: 上面我说的这个行为,我稍微确认下了,这个应该是一个代码生成阶段才做的区分,所以优化过程中并不能识别,所以即使是batch模式下,优化的plan也应该是包含dt字段的。 [1] https://issues.apache.org/jira/browse/CALCITE-3531 ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月20日周一 11:06写道: > > 您好,当前是流任务。我跟了下代码,cast(CURRENT_DATE as string) 被识别了常量。这个问题已经在 calcite > 中修复了,https://github.com/apache/calcite/pull/1602/files > 但是,flink 中引用的 calcite 版本并没有修复这个问题。我这边通过自定义 udf 来规避了这个问题。 > > > > > --原始邮件-- > 发件人: > "user-zh" > > 发送时间:2024年5月20日(星期一) 上午10:32 > 收件人:"user-zh" > 主题:Re: flinksql 经过优化后,group by字段少了 > > > > 看起来像是因为 "dt = cast(CURRENT_DATE as string)" 推导 dt 这个字段是个常量,进而被优化掉了。 > > 将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛? > > ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid 于2024年5月19日周日 01:01写道: > > create view tmp_view as > SELECT > dt, -- 2 > uid, -- 0 > uname, -- 1 > uage -- 3 > from > kafkaTable > where dt = cast(CURRENT_DATE as string); > > insert into printSinkTable > select > dt, uid, uname, sum(uage) > from tmp_view > group by > dt, > uid, > uname; > > > > sql 比较简单,首先根据 dt = current_date 条件进行过滤,然后 按照dt、uid、uname 三个字段进行聚合求和操作。 > 但是,经过优化后,生成的 物理结构如下: > == Optimized Execution Plan == > Sink(table=[default_catalog.default_database.printSinkTable], > fields=[dt, uid, uname, EXPR$3]) > +- Calc(select=[CAST(CAST(CURRENT_DATE())) AS dt, uid, uname, EXPR$3]) > nbsp; nbsp;+- GroupAggregate(groupBy=[uid, uname], > select=[uid, uname, SUM(uage) AS EXPR$3]) > nbsp; nbsp; nbsp; +- Exchange(distribution=[hash[uid, > uname]]) > nbsp; nbsp; nbsp; nbsp; nbsp;+- > Calc(select=[uid, uname, uage], where=[(dt = CAST(CURRENT_DATE()))]) > nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; +- > TableSourceScan(table=[[default_catalog, default_database, kafkaTable]], > fields=[uid, uname, dt, uage]) > > > > 请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢 > > > > -- > > Best, > Benchao Li -- Best, Benchao Li
Re: flinksql 经过优化后,group by字段少了
看起来像是因为 "dt = cast(CURRENT_DATE as string)" 推导 dt 这个字段是个常量,进而被优化掉了。 将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛? ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月19日周日 01:01写道: > > create view tmp_view as > SELECT > dt, -- 2 > uid, -- 0 > uname, -- 1 > uage -- 3 > from > kafkaTable > where dt = cast(CURRENT_DATE as string); > > insert into printSinkTable > select > dt, uid, uname, sum(uage) > from tmp_view > group by > dt, > uid, > uname; > > > > sql 比较简单,首先根据 dt = current_date 条件进行过滤,然后 按照dt、uid、uname 三个字段进行聚合求和操作。 > 但是,经过优化后,生成的 物理结构如下: > == Optimized Execution Plan == > Sink(table=[default_catalog.default_database.printSinkTable], fields=[dt, > uid, uname, EXPR$3]) > +- Calc(select=[CAST(CAST(CURRENT_DATE())) AS dt, uid, uname, EXPR$3]) > +- GroupAggregate(groupBy=[uid, uname], select=[uid, uname, > SUM(uage) AS EXPR$3]) >+- Exchange(distribution=[hash[uid, uname]]) > +- Calc(select=[uid, uname, uage], > where=[(dt = CAST(CURRENT_DATE()))]) > +- > TableSourceScan(table=[[default_catalog, default_database, kafkaTable]], > fields=[uid, uname, dt, uage]) > > > > 请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢 -- Best, Benchao Li
Re: [ANNOUNCE] Apache Flink 1.19.0 released
Congratulations! And thanks to all release managers and everyone involved in this release! Yubin Li 于2024年3月18日周一 18:11写道: > > Congratulations! > > Thanks to release managers and everyone involved. > > On Mon, Mar 18, 2024 at 5:55 PM Hangxiang Yu wrote: > > > > Congratulations! > > Thanks release managers and all involved! > > > > On Mon, Mar 18, 2024 at 5:23 PM Hang Ruan wrote: > > > > > Congratulations! > > > > > > Best, > > > Hang > > > > > > Paul Lam 于2024年3月18日周一 17:18写道: > > > > > > > Congrats! Thanks to everyone involved! > > > > > > > > Best, > > > > Paul Lam > > > > > > > > > 2024年3月18日 16:37,Samrat Deb 写道: > > > > > > > > > > Congratulations ! > > > > > > > > > > On Mon, 18 Mar 2024 at 2:07 PM, Jingsong Li > > > > wrote: > > > > > > > > > >> Congratulations! > > > > >> > > > > >> On Mon, Mar 18, 2024 at 4:30 PM Rui Fan <1996fan...@gmail.com> wrote: > > > > >>> > > > > >>> Congratulations, thanks for the great work! > > > > >>> > > > > >>> Best, > > > > >>> Rui > > > > >>> > > > > >>> On Mon, Mar 18, 2024 at 4:26 PM Lincoln Lee > > > > >> wrote: > > > > >>>> > > > > >>>> The Apache Flink community is very happy to announce the release of > > > > >> Apache Flink 1.19.0, which is the fisrt release for the Apache Flink > > > > 1.19 > > > > >> series. > > > > >>>> > > > > >>>> Apache Flink® is an open-source stream processing framework for > > > > >> distributed, high-performing, always-available, and accurate data > > > > streaming > > > > >> applications. > > > > >>>> > > > > >>>> The release is available for download at: > > > > >>>> https://flink.apache.org/downloads.html > > > > >>>> > > > > >>>> Please check out the release blog post for an overview of the > > > > >> improvements for this bugfix release: > > > > >>>> > > > > >> > > > > > > > https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/ > > > > >>>> > > > > >>>> The full release notes are available in Jira: > > > > >>>> > > > > >> > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282 > > > > >>>> > > > > >>>> We would like to thank all contributors of the Apache Flink > > > community > > > > >> who made this release possible! > > > > >>>> > > > > >>>> > > > > >>>> Best, > > > > >>>> Yun, Jing, Martijn and Lincoln > > > > >> > > > > > > > > > > > > > > > > > -- > > Best, > > Hangxiang. -- Best, Benchao Li
Re: 急 [FLINK-34170] 何时能够修复?
FLINK-34170 只是一个UI的展示问题,并不影响实际的运行。 JDBC Connector 维表下推的 filter 不生效问题,已经在 FLINK-33365 中修复了,最新的 JDBC Connector 版本中已经带上了这个修复,你可以试一下~ casel.chen 于2024年3月15日周五 10:39写道: > > 我们最近在使用Flink 1.17.1开发flink sql作业维表关联使用复合主键时遇到FLINK-34170描述一样的问题,请问这个major > issue什么时候在哪个版本后能够修复呢?谢谢! > > > select xxx from kafka_table as kt > left join phoenix_table FORSYSTEM_TIMEASOFphoenix_table.proctime as pt > on kt.trans_id=pt.trans_id and pt.trans_date = > DATE_FORMAT(CURRENT_TIMESTAMP,'MMdd'); > > > phoenix表主键是 trans_id + trans_date > 复合主键,实际作业运行发现flink只会带trans_id字段对phoenix表进行scan查询,再根据scan查询结果按trans_date字段值进行过滤 > > > https://issues.apache.org/jira/browse/FLINK-34170 -- Best, Benchao Li
Re: RE: lock up表过滤条件下推导致的bug
这个问题应该是跟 FLINK-33365[1] 中说的是同一个问题,这个已经在修复中了,在最新的 JDBC Connector 版本中会修复它。 [1] https://issues.apache.org/jira/browse/FLINK-33365 杨光跃 于2023年12月26日周二 09:25写道: > > > > > > > > CompiledPlan plan = env.compilePlanSql("insert into out_console " + > " select r.apply_id from t_purch_apply_sent_route r " + > " left join t_purch_apply_sent_route_goods FOR SYSTEM_TIME AS OF r.pt as t " > + > "ON t.apply_id = r.apply_id and t.isdel = r.isdel" + > " where r.apply_id = 61558439941351 and t.route_goods_id is not null and > t.is_change = 2 " ); > > > > > > > > > > > > 在 2023-12-25 20:46:36,"Jiabao Sun" 写道: > >Hi, > > > >邮件中的图片没显示出来,麻烦把 SQL 贴出来一下。 > > > >Best, > >Jiabao > > > > > >On 2023/12/25 12:22:41 杨光跃 wrote: > >> 我的sql如下: > >> 、 > >> > >> > >> t_purch_apply_sent_route 是通过flink cdc创建的 > >> t_purch_apply_sent_route_goods 是普通的jdbc > >> 我期望的结果是返回符合过滤条件的;但现在执行的结果,会返回t_purch_apply_sent_route表所有数据 > >> 这显然不符合我的预期,原因应该是因为过滤条件进行了过早的下推 > >> 这应该算是bug吧,或者要满足我的预期,该怎么写sql? > >> > >> > >> > >> -- Best, Benchao Li
Re: Re: flink 1.18.0 使用 hive beeline + jdbc driver连接sql gateway失败
hiveserver2 endpoint 就是让 flink gateway 直接变成 hive server2,对外来讲它就是 hive server2 了,它可以直接跟已有的跟 hive server2 的工具配合一起使用。 但是现在你其实用的是 flink jdbc driver,这个并不是跟 hive server2 交互,它就是跟 flink gateway 交互,所以你用hive server2的模式启动,它就不认识了。 casel.chen 于2023年10月30日周一 14:36写道: > > 果然不指定endpoint为hiveserver2类型后使用hive beeline工具连接上了。感谢! > 不过我仍然有个疑问,看官网文档上有写提供 hiveserver2 endpoint > 是为了兼容hive方言,按理也应该可以使用beeline连接上,因为原本beeline支持连接hiveserver2 > 以下是原文: > HiveServer2 Endpoint is compatible with HiveServer2 wire protocol and allows > users to interact (e.g. submit Hive SQL) with Flink SQL Gateway with existing > Hive clients, such as Hive JDBC, Beeline, DBeaver, Apache Superset and so on. > 这里有提到Beeline工具,难道不是 beeline> !connect jdbc:flink://localhost:8083 这样的连接方式了? > > > > > > > > > > > > > > > > > > > > 在 2023-10-30 11:27:15,"Benchao Li" 写道: > >Hi casel, > > > >Flink JDBC 链接到 gateway 目前使用的是 flink 的 gateway 接口,所以你在启动 gateway > >的时候不用指定 endpoint 为 hiveserver2 类型,用 Flink 默认的 gateway endpoint 类型即可。 > > > >casel.chen 于2023年10月29日周日 17:24写道: > >> > >> 1. 启动flink集群 > >> bin/start-cluster.sh > >> > >> > >> 2. 启动sql gateway > >> bin/sql-gateway.sh start -Dsql-gateway.endpoint.type=hiveserver2 > >> > >> > >> 3. 将flink-sql-jdbc-driver-bundle-1.18.0.jar放到apache-hive-3.1.2-bin/lib目录下 > >> > >> > >> 4. 到apache-hive-3.1.2-bin目录下启动beeline连接sql gateway,提示输入用户名和密码时直接按的回车 > >> $ bin/beeline > >> SLF4J: Class path contains multiple SLF4J bindings. > >> SLF4J: Found binding in > >> [jar:file:/Users/admin/dev/hadoop-3.3.4/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] > >> SLF4J: Found binding in > >> [jar:file:/Users/admin/dev/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] > >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > >> explanation. > >> SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory] > >> Beeline version 3.1.2 by Apache Hive > >> beeline> !connect jdbc:flink://localhost:8083 > >> Connecting to jdbc:flink://localhost:8083 > >> Enter username for jdbc:flink://localhost:8083: > >> Enter password for jdbc:flink://localhost:8083: > >> Failed to create the executor. > >> 0: jdbc:flink://localhost:8083 (closed)> CREATE TABLE T( > >> . . . . . . . . . . . . . . . . . . . .> a INT, > >> . . . . . . . . . . . . . . . . . . . .> b VARCHAR(10) > >> . . . . . . . . . . . . . . . . . . . .> ) WITH ( > >> . . . . . . . . . . . . . . . . . . . .> 'connector' = 'filesystem', > >> . . . . . . . . . . . . . . . . . . . .> 'path' = 'file:///tmp/T.csv', > >> . . . . . . . . . . . . . . . . . . . .> 'format' = 'csv' > >> . . . . . . . . . . . . . . . . . . . .> ); > >> Failed to create the executor. > >> Connection is already closed. > >> > > > > > >-- > > > >Best, > >Benchao Li -- Best, Benchao Li
Re: flink 1.18.0 使用 hive beeline + jdbc driver连接sql gateway失败
Hi casel, Flink JDBC 链接到 gateway 目前使用的是 flink 的 gateway 接口,所以你在启动 gateway 的时候不用指定 endpoint 为 hiveserver2 类型,用 Flink 默认的 gateway endpoint 类型即可。 casel.chen 于2023年10月29日周日 17:24写道: > > 1. 启动flink集群 > bin/start-cluster.sh > > > 2. 启动sql gateway > bin/sql-gateway.sh start -Dsql-gateway.endpoint.type=hiveserver2 > > > 3. 将flink-sql-jdbc-driver-bundle-1.18.0.jar放到apache-hive-3.1.2-bin/lib目录下 > > > 4. 到apache-hive-3.1.2-bin目录下启动beeline连接sql gateway,提示输入用户名和密码时直接按的回车 > $ bin/beeline > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/admin/dev/hadoop-3.3.4/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/admin/dev/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory] > Beeline version 3.1.2 by Apache Hive > beeline> !connect jdbc:flink://localhost:8083 > Connecting to jdbc:flink://localhost:8083 > Enter username for jdbc:flink://localhost:8083: > Enter password for jdbc:flink://localhost:8083: > Failed to create the executor. > 0: jdbc:flink://localhost:8083 (closed)> CREATE TABLE T( > . . . . . . . . . . . . . . . . . . . .> a INT, > . . . . . . . . . . . . . . . . . . . .> b VARCHAR(10) > . . . . . . . . . . . . . . . . . . . .> ) WITH ( > . . . . . . . . . . . . . . . . . . . .> 'connector' = 'filesystem', > . . . . . . . . . . . . . . . . . . . .> 'path' = 'file:///tmp/T.csv', > . . . . . . . . . . . . . . . . . . . .> 'format' = 'csv' > . . . . . . . . . . . . . . . . . . . .> ); > Failed to create the executor. > Connection is already closed. > -- Best, Benchao Li
Re: [ANNOUNCE] Apache Flink 1.18.0 released
Great work, thanks everyone involved! Rui Fan <1996fan...@gmail.com> 于2023年10月27日周五 10:16写道: > > Thanks for the great work! > > Best, > Rui > > On Fri, Oct 27, 2023 at 10:03 AM Paul Lam wrote: > > > Finally! Thanks to all! > > > > Best, > > Paul Lam > > > > > 2023年10月27日 03:58,Alexander Fedulov 写道: > > > > > > Great work, thanks everyone! > > > > > > Best, > > > Alexander > > > > > > On Thu, 26 Oct 2023 at 21:15, Martijn Visser > > > wrote: > > > > > >> Thank you all who have contributed! > > >> > > >> Op do 26 okt 2023 om 18:41 schreef Feng Jin > > >> > > >>> Thanks for the great work! Congratulations > > >>> > > >>> > > >>> Best, > > >>> Feng Jin > > >>> > > >>> On Fri, Oct 27, 2023 at 12:36 AM Leonard Xu wrote: > > >>> > > >>>> Congratulations, Well done! > > >>>> > > >>>> Best, > > >>>> Leonard > > >>>> > > >>>> On Fri, Oct 27, 2023 at 12:23 AM Lincoln Lee > > >>>> wrote: > > >>>> > > >>>>> Thanks for the great work! Congrats all! > > >>>>> > > >>>>> Best, > > >>>>> Lincoln Lee > > >>>>> > > >>>>> > > >>>>> Jing Ge 于2023年10月27日周五 00:16写道: > > >>>>> > > >>>>>> The Apache Flink community is very happy to announce the release of > > >>>>> Apache > > >>>>>> Flink 1.18.0, which is the first release for the Apache Flink 1.18 > > >>>>> series. > > >>>>>> > > >>>>>> Apache Flink® is an open-source unified stream and batch data > > >>>> processing > > >>>>>> framework for distributed, high-performing, always-available, and > > >>>>> accurate > > >>>>>> data applications. > > >>>>>> > > >>>>>> The release is available for download at: > > >>>>>> https://flink.apache.org/downloads.html > > >>>>>> > > >>>>>> Please check out the release blog post for an overview of the > > >>>>> improvements > > >>>>>> for this release: > > >>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/ > > >>>>>> > > >>>>>> The full release notes are available in Jira: > > >>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885 > > >>>>>> > > >>>>>> We would like to thank all contributors of the Apache Flink > > >> community > > >>>> who > > >>>>>> made this release possible! > > >>>>>> > > >>>>>> Best regards, > > >>>>>> Konstantin, Qingsheng, Sergey, and Jing > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > > -- Best, Benchao Li
Re: GenericRowData与BinaryRowData的转换
Hi zilong, 应该是没有内置的方法直接进行转换的,如果有需要,还是需要自己根据schema做一遍读取和写入。 另外,在FLINK-24403[1] 中加强了对于复杂类型的print能力,可以直接把他们cast成string来打印。 [1] https://issues.apache.org/jira/browse/FLINK-24403 zilong xiao 于2023年3月13日周一 16:22写道: > hi, benchao, 想问下有什么办法可以将BinaryRowData转成GenericRowData吗?我们业务场景需要对RowData > toString,BinaryRowData没有实现该方法QQAQ > > Benchao Li 于2021年4月9日周五 10:42写道: > > > GenericRowData和BinaryRowData都是RowData这个接口的具体实现。 > > 所以你只需要针对RowData进行编程即可,不能假设它使用哪个具体实现。 > > > > 关于你的问题,在算子之间数据计算和转换的时候,会有很多地方构造出来BinaryRowData, > > 比如典型的就是序列化的时候都会按照BinaryRowData来序列化。 > > > > Luna Wong 于2021年4月8日周四 下午7:36写道: > > > > > 我看Kafka Connector源码生成的是GenericRowData,到Jdbc > > > Sink类型编程BinaryRowData了。Runtime层把GenericRowData转换BinaryRowData的规则是什么? > > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li
Re: Flink SQL Client 解析 Protobuf
Hi Min, ProtoBuf Format[1] 有一个相关的PR,我们正在推进review和改进,预期是在1.16 中可以release出去。你也可以基于这个PR的代码编译打包一下,提前试用一下。 [1] https://github.com/apache/flink/pull/14376 Min Tu 于2022年7月4日周一 02:38写道: > 各位大佬, > > 我们想利用 Flink SQL Client 解析 Kafka 数据流: 对于Kafka 数据流是Json 或者Avro 格式,已经可以解析, > 但是对于 Protobuf 数据格式,Flink SQL Client 没有直接支持。 > > Flink SQL Client 有一个 Raw 格式选项,不过我们还没有找到如何使用这个的文档; > > 看看大家有没有相关的经验可以分享 > > 多谢 > -- Best, Benchao Li
Re: Re: Re: flink sql回撤流sink优化问题
mini-batch对aggregate算子是有效的,开启了之后它的输出会降低一些,从而降低了sink的输出压力。 casel.chen 于2022年1月7日周五 07:42写道: > mini-batch优化针对sink算子也有效吗?我是直接aggregate without window然后将聚合结果输出到sink算子。 > > > > > > > > > > > > > > > > > > 在 2022-01-06 20:43:00,"Benchao Li" 写道: > >这个问题可以用mini-batch[1]来解决呀 > > > >[1] > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation > > > >casel.chen 于2021年12月26日周日 18:01写道: > > > >> 你说的是upsert-kafka的这两个参数吗? > >> > >> sink.buffer-flush.max-rows > >> sink.buffer-flush.interval > >> 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink > >> kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2021-12-25 22:54:19,"郭伟权" 写道: > >> > >> > >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量 > >> > > >> >casel.chen 于2021年12月23日周四 08:15写道: > >> > > >> >> flink sql中aggregate without > >> >> > window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql > >> >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? > >> >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? > >> >> > >> >> > >> >> 例如有下面binlog cdc购买数据(订单购买金额会更新): > >> >> > >> >> orderid. categorydt > amt > >> >> > >> >> 订单id 商品类型 购买时间(MMddHH) 购买金额 > >> >> > >> >> > >> >> > >> >> > >> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 > >> >> > >> >> > >> >> > >> >> INSERT INTO mysql_sink_table > >> >> > >> >> SELECT category, dt, LAST_VALUE(total) > >> >> > >> >> OVER ( > >> >> > >> >> PARTITION BY category > >> >> > >> >> ORDER BY PROCTIME() > >> >> > >> >> RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW > >> >> > >> >> ) AS var1 > >> >> > >> >> FROM ( > >> >> > >> >> SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, > dt > >> >> > >> >> ); > >> > > > > > >-- > > > >Best, > >Benchao Li > -- Best, Benchao Li
Re: Re: flink sql回撤流sink优化问题
这个问题可以用mini-batch[1]来解决呀 [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation casel.chen 于2021年12月26日周日 18:01写道: > 你说的是upsert-kafka的这两个参数吗? > > sink.buffer-flush.max-rows > sink.buffer-flush.interval > 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink > kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。 > > > > > > > > > > > > > > > > > > 在 2021-12-25 22:54:19,"郭伟权" 写道: > > >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量 > > > >casel.chen 于2021年12月23日周四 08:15写道: > > > >> flink sql中aggregate without > >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql > >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? > >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? > >> > >> > >> 例如有下面binlog cdc购买数据(订单购买金额会更新): > >> > >> orderid. categorydt amt > >> > >> 订单id 商品类型 购买时间(MMddHH) 购买金额 > >> > >> > >> > >> > >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 > >> > >> > >> > >> INSERT INTO mysql_sink_table > >> > >> SELECT category, dt, LAST_VALUE(total) > >> > >> OVER ( > >> > >> PARTITION BY category > >> > >> ORDER BY PROCTIME() > >> > >> RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW > >> > >> ) AS var1 > >> > >> FROM ( > >> > >> SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt > >> > >> ); > -- Best, Benchao Li
Re: FlinkSQL Source和Sink的Operator name为什么格式不同
Hi Ada, 这应该就是一个实现上的失误: 1. source的名字是calcite的`TableScan#explainTerms`里面实现的,用的是`explainTerms(pw).item("table", this.table.getQualifiedName())` 2. sink里的名字是flink里的`Sink#explainTerms`实现的,用的是`.explainTerms(pw).item("table", tableIdentifier.asSummaryString())` yidan zhao 于2021年9月30日周四 上午11:43写道: > 我猜哈,是因为source支持多张表。比如多个表union支持select的情况。 > > Ada Luna 于2021年9月29日周三 下午6:02写道: > > > Source: TableSourceScan(table=[[default_catalog, default_database, > > ods_k]], fields=[id, name]) > > Sink: Sink(table=[default_catalog.default_database.ads_k], fields=[id, > > name]) > > Sink: Sink(table=[default_catalog.default_database.ads_k2], fields=[id, > > name])) > > > > > > TableSourceScan 和 Sink相比多了个 中括号,并且采用 ',' 分割名字功空间,这是为什么 > > > -- Best, Benchao Li
Re: flink-1.12.0 流模式 使用 lag问题
Hi,这个是一个已知的bug[1],已经在1.13.1以及1.4版本修复了。 可以使用一下1.13.1试一下,1.4版本现在也正在投票中了,应该很快就会发布出来了。 [1] https://issues.apache.org/jira/browse/FLINK-19449 kcz <573693...@qq.com.invalid> 于2021年9月22日周三 上午11:41写道: > 如何使用才是正确的,求大佬帮看看 > behavior,next_bv 字段内容一直是保持一致的,无法得到自己想要的结果 > > > 发送的数据 > { > "user_id":1, > "item_id":1, > "behavior":"pv1" > } > { > "user_id":1, > "item_id":1, > "behavior":"pv2" > } > > > > > > > CREATE TABLE KafkaTable ( > `user_id` BIGINT, > `item_id` BIGINT, > `behavior` STRING, > proctime as PROCTIME() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = '', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'json' > ); > > > > SELECT > user_id, > item_id, > behavior, > next_bv > FROM > ( SELECT *, lag( behavior, 1 ) over ( PARTITION BY user_id ORDER > BY proctime ) AS next_bv FROM KafkaTable ) t; -- Best, Benchao Li
Re: Flink 从checkpoint恢复时,部分状态没有正确恢复
这个问题已经在1.12中修复了,参考: https://issues.apache.org/jira/browse/FLINK-18688 Benchao Li 于2021年8月30日周一 下午7:49写道: > Hi xingxing, > > 看起来你可能也遇到了这个bug了。 > 我们遇到过一个bug是这样的,在group by的多个字段里面,如果有2个及以上变长字段的话,会导致底层的BinaryRow序列化 > 的结果不稳定,进而导致状态恢复会错误。 > 先解释下变长字段,这个指的是4byte存不下的数据类型,比如典型的varchar、list、map、row等等; > 然后再解释下这个结果不稳定的原因,这个是因为在底层的代码生成里面有一个优化,会按照类型进行分组,然后进行优化, > 但是这个分组的过程用的是一个HashMap[1],会导致字段顺序不是确定性的,有时候是这个顺序,有时候又是另外一个顺序, > 导致最终的BinaryRow的序列化结果是不稳定的,进而导致无法从checkpoint恢复。 > > 然后典型的多种变长类型,其实是varchar nullable 和 varchar not null 以及 > char(n),尤其是你这种用了很多常量字符串的场景, > 容易产生后两种类型,在加上普通字段以及函数都会产生的第一种类型,就会触发这个bug了。 > > [1] > https://github.com/apache/flink/blob/release-1.9/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala#L92 > > dixingxing 于2021年8月25日周三 下午8:57写道: > >> Hi Flink 社区: >> 我们的Flink版本是1.9.2,用的是blink planer,我们今天遇到一个问题,目前没有定位到原因,现象如下: >> >> 手动重启任务时,指定了从一个固定的checkpoint恢复时,有一定的概率,一部分状态数据无法正常恢复,启动后Flink任务本身可以正常运行,且日志中没有明显的报错信息。 >> 具体现象是:type=realshow的数据没有从状态恢复,也就是从0开始累加,而type=show和type=click的数据是正常从状态恢复的。 >> >> >> SQL大致如下: >> createview view1 as >> select event_id, act_time, device_id >> from table1 >> where `getStringFromJson`(`act_argv`, 'ispin', '') <>'1' >> and event_id in >> ('article_newest_list_show','article_newest_list_sight_show', >> 'article_list_item_click', 'article_auto_video_play_click'); >> >> >> --天的数据 >> insertinto table2 >> select platform, type, `time`, count(1) as pv, hll_uv(device_id) as uv >> from >> (select'03'as platform, trim(casewhen event_id >> ='article_newest_list_show'then'show' >> when event_id ='article_newest_list_sight_show'then'realshow' >> when event_id ='article_list_item_click'then'click'else''end) astype, >> `date_parse`(`act_time`, '-MM-dd HH:mm:ss', 'MMdd') as `time`, >> device_id >> from view1 >> where event_id in >> ('article_newest_list_show','article_newest_list_sight_show', >> 'article_list_item_click') >> unionall >> select'03'as platform, 'click_total'astype, >> `date_parse`(`act_time`, 'yyyy-MM-dd HH:mm:ss', 'yyyyMMdd') as `time`, >> device_id >> from view1 >> where event_id in ('article_list_item_click', >> 'article_auto_video_play_click'))a >> groupby platform, type, `time`; >> >> >> 期待大家的帮助与回复,希望能给些问题排查的思路! >> >> >> >> > > -- > > Best, > Benchao Li > -- Best, Benchao Li
Re: Flink 从checkpoint恢复时,部分状态没有正确恢复
Hi xingxing, 看起来你可能也遇到了这个bug了。 我们遇到过一个bug是这样的,在group by的多个字段里面,如果有2个及以上变长字段的话,会导致底层的BinaryRow序列化 的结果不稳定,进而导致状态恢复会错误。 先解释下变长字段,这个指的是4byte存不下的数据类型,比如典型的varchar、list、map、row等等; 然后再解释下这个结果不稳定的原因,这个是因为在底层的代码生成里面有一个优化,会按照类型进行分组,然后进行优化, 但是这个分组的过程用的是一个HashMap[1],会导致字段顺序不是确定性的,有时候是这个顺序,有时候又是另外一个顺序, 导致最终的BinaryRow的序列化结果是不稳定的,进而导致无法从checkpoint恢复。 然后典型的多种变长类型,其实是varchar nullable 和 varchar not null 以及 char(n),尤其是你这种用了很多常量字符串的场景, 容易产生后两种类型,在加上普通字段以及函数都会产生的第一种类型,就会触发这个bug了。 [1] https://github.com/apache/flink/blob/release-1.9/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala#L92 dixingxing 于2021年8月25日周三 下午8:57写道: > Hi Flink 社区: > 我们的Flink版本是1.9.2,用的是blink planer,我们今天遇到一个问题,目前没有定位到原因,现象如下: > > 手动重启任务时,指定了从一个固定的checkpoint恢复时,有一定的概率,一部分状态数据无法正常恢复,启动后Flink任务本身可以正常运行,且日志中没有明显的报错信息。 > 具体现象是:type=realshow的数据没有从状态恢复,也就是从0开始累加,而type=show和type=click的数据是正常从状态恢复的。 > > > SQL大致如下: > createview view1 as > select event_id, act_time, device_id > from table1 > where `getStringFromJson`(`act_argv`, 'ispin', '') <>'1' > and event_id in > ('article_newest_list_show','article_newest_list_sight_show', > 'article_list_item_click', 'article_auto_video_play_click'); > > > --天的数据 > insertinto table2 > select platform, type, `time`, count(1) as pv, hll_uv(device_id) as uv > from > (select'03'as platform, trim(casewhen event_id > ='article_newest_list_show'then'show' > when event_id ='article_newest_list_sight_show'then'realshow' > when event_id ='article_list_item_click'then'click'else''end) astype, > `date_parse`(`act_time`, '-MM-dd HH:mm:ss', 'MMdd') as `time`, > device_id > from view1 > where event_id in > ('article_newest_list_show','article_newest_list_sight_show', > 'article_list_item_click') > unionall > select'03'as platform, 'click_total'astype, > `date_parse`(`act_time`, '-MM-dd HH:mm:ss', 'MMdd') as `time`, > device_id > from view1 > where event_id in ('article_list_item_click', > 'article_auto_video_play_click'))a > groupby platform, type, `time`; > > > 期待大家的帮助与回复,希望能给些问题排查的思路! > > > > -- Best, Benchao Li
Re: 分组滚动窗口 无法触发计算,由于 watermark 没有被生成,或者被计算。
这是一个已知的问题。社区已经有相关issue[1] 在跟进修复了,感兴趣的话可以了解下~ [1] https://issues.apache.org/jira/browse/FLINK-18934 jie mei 于2021年4月12日周一 下午2:59写道: > 此外,在事件时间,场景下,如果一个 Stream A 有消息, 另一个 Stream B 没有消息进行 UNION ALL。那么 Stream B > 的消息永远是一个 Long.MIN_VALUE, 进行水印对其的时候,UNION ALL 后的水印取所有 CHANNEL 的最小水印,也就是 > Long.MIN_VALUE, 这就导致分组滚动窗口一致得不到计算。 > > jie mei 于2021年4月12日周一 上午11:24写道: > > > 问题已经解决,因为我的 StreamEnv 默认设置为事件时间。去掉就可以了,这导致了watermark没有生成。 > > > > jie mei 于2021年4月12日周一 上午1:49写道: > > > >> 大家好,我有一个 Flink 程序, 使用事件时间做分组窗口计算,但是无法触发窗口计算。我Debug到 WindowOperator, 下, > >> 发现 WindowOperator 的 TriggerContext中的当前水印一直是一个负数, StreamTaskNetworkInput > >> 中的 processElement 方法没有接受到 watermark 消息, recordOrMark.isWatermark() == > false。 > >> > >> 我自己的怀疑难道是事件时间每设置对? 但是对比了文档,应该是可以的。下面是我的 DDL > >> > >> create table input_table ( > >> `dim` varchar, > >> `server_time` bigint, > >> `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000, > >> '-MM-dd HH:mm:ss')), > >> WATERMARK FOR `event_time` AS `event_time` > >> ) > >> select TUMBLE_START(`event_time`, INTERVAL '1' SECOND) AS `log_time`, > >> `dim`, > >> count(1), > >> FROM input_table > >> GROUP BY TUMBLE(`event_time`, INTERVAL '1' SECOND),`dim` > >> > >> > >> > >> *Best Regards* > >> *Jeremy Mei* > >> > > > > > > -- > > > > *Best Regards* > > *Jeremy Mei* > > > > > -- > > *Best Regards* > *Jeremy Mei* > -- Best, Benchao Li
Re: GenericRowData与BinaryRowData的转换
GenericRowData和BinaryRowData都是RowData这个接口的具体实现。 所以你只需要针对RowData进行编程即可,不能假设它使用哪个具体实现。 关于你的问题,在算子之间数据计算和转换的时候,会有很多地方构造出来BinaryRowData, 比如典型的就是序列化的时候都会按照BinaryRowData来序列化。 Luna Wong 于2021年4月8日周四 下午7:36写道: > 我看Kafka Connector源码生成的是GenericRowData,到Jdbc > Sink类型编程BinaryRowData了。Runtime层把GenericRowData转换BinaryRowData的规则是什么? > -- Best, Benchao Li
Re: Flink1.11执行sql当判空使用<> null,程序直接结束
嗯,是这样的。 datayangl 于2021年3月19日周五 下午5:55写道: > calcite解析将<> null 解析为unknown, 在flink优化阶段直接将unkown这个条件默认视为false,通过规则匹配 > 将整条sql优化为values(没有任何结果的sql),于是直接将程序的source task finish了。这个过程我理解的对吗? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, Benchao Li
Re: Flink1.11执行sql当判空使用<> null,程序直接结束
Hi datayangl, 这是因为kafka_table.src_ip <> null是恒等于false的,所以这个计算过程就被优化掉了,最后你的作业的逻辑就变成了一个单纯的values,里面没有一条数据。 关于为什么kafka_table.src_ip <> null,这个可以了解一下关于three-value-logic[1]. 简单来说,在标准SQL里面,boolean类型是有三种值的,正常的= <>这种算子跟null比较的时候,结果都是unknown, 然后这个在filter条件里面会被视作false。 [1] https://modern-sql.com/concept/three-valued-logic datayangl 于2021年3月19日周五 下午4:02写道: > 环境:flink1.11: > 代码如下: > val dataStreamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv > val tableEnv: StreamTableEnvironment = FlinkUtils.streamTableEnv > val sql = """SELECT > CASE > WHEN > kafka_table.log_type = 'detect' > AND > kafka_table.event_level = 3 > THEN 3 > ELSE 0 > END as weight, > kafka_table.src_ip as kafka_table_src_ip_0, > kafka_table.dev_type as kafka_table_dev_type_0 > FROM > kafka_table > WHERE > kafka_table.event_time >= unix_timestamp() - 60 * 60 * 5 > AND > kafka_table.src_ip <> null > AND > kafka_table.event_level > 0 > AND > kafka_table.dev_type = 1 > > > val data:Table = tableEnv.sqlQuery(sql) > val result = tableEnv.toRetractStream[Row](data) > result.print(">") > """ > > > > 现象:如果判空条件为kafka_table.src_ip <> null,则程序直接结束,没有任何报错,而使用kafka_table.src_ip > is > not null 可以正常运行并一直产生数据。 > > 疑问:我明白is not null是正确的用法,问题是用<> null 为什么程序会直接结束而且没有任何报错,感觉像是当作批处理去运行了。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
Re: flink sql 的 count(distinct )问题
Hi, 你可以理解为用的是MapState来保存的状态。 op <520075...@qq.com> 于2021年3月16日周二 下午3:00写道: > 各位大佬好,想问下flinksql里的count (distinct)默认是用哪种state保存的状态 -- Best, Benchao Li
Re: UDF 重复调用的问题、
我没有搜到相关的issue,所以我先建了一个issue[1]。 这个优化相对来说影响比较大,需要仔细的设计和权衡,所以在社区推进的速度 可能没有办法保证,大家感兴趣的可以在issue里去讨论。 [1] https://issues.apache.org/jira/browse/FLINK-21573 Qishang 于2021年3月3日周三 上午11:03写道: > Hi Benchao. > > 现在的场景是UDF中去查询外部存储,数据量不大,但是执行多次还是在一个算子里串行的。算子耗时就会变成调用次数的倍数了。 这个影响就有点严重了。 > 这个 feature 社区有规划了吗? > > > Benchao Li 于2021年3月3日周三 上午10:23写道: > > > 当前的确是还没有表达式复用的优化,所以表达式最终都是会重复执行的。 > > 这个应该是未来要优化的一个点,我们内部也是刚刚做了这个feature。 > > > > 这个没有复用不只是在SQL里面看到的多少次,就会执行多少次,而是在 > > plan的过程中会将表达式完全展开,比如下面的SQL: > > ```SQL > > SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as > > key3 > > FROM ( > > SELECT dump_json_to_map(col1) as my_map > > FROM T > > ) > > ``` > > 这种写法也会将`dump_json_to_map`这个函数执行3次。 > > > > HunterXHunter <1356469...@qq.com> 于2021年3月3日周三 上午9:43写道: > > > > > 为什么4次是没问题的,感觉只执行一次才是最优的 > > > > > > > > > > > > -- > > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li
Re: UDF 重复调用的问题、
当前的确是还没有表达式复用的优化,所以表达式最终都是会重复执行的。 这个应该是未来要优化的一个点,我们内部也是刚刚做了这个feature。 这个没有复用不只是在SQL里面看到的多少次,就会执行多少次,而是在 plan的过程中会将表达式完全展开,比如下面的SQL: ```SQL SELECT my_map['key1'] as key1, my_map['key2'] as key2, my_map['key3'] as key3 FROM ( SELECT dump_json_to_map(col1) as my_map FROM T ) ``` 这种写法也会将`dump_json_to_map`这个函数执行3次。 HunterXHunter <1356469...@qq.com> 于2021年3月3日周三 上午9:43写道: > 为什么4次是没问题的,感觉只执行一次才是最优的 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, Benchao Li
Re: 关于 stream-stream Interval Join 的问题
你用的是哪个版本的Flink呢? 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。 所以你的binlog是怎么读进来的呢?自定义的format? macia kk 于2020年12月10日周四 上午1:06写道: > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS event_time - > INTERVAL 'x' HOUR > > 发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness > > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x, > 能够反推出来数据的 currentMaxTimestamp > > currentMaxTimestamp = watermark + maxOutOfOrderness > > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快 > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。 > > > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55 > > {"table":"transaction_tab_0122","database":"main_db","transaction_type":1,"transaction_id":11,"reference_id":"11","transaction_sn":"1","merchant_id":1,"status":1,"event_time":" > *2020-12-10T01:02:24Z*"} > > UI 上显示的 watermark 是 1607555031000(Your time zone: 2020年12月10日星期四早上7点02分 > GMT+08:00) > > 这个 watermark 是未来的时间 > > > > > > macia kk 于2020年12月9日周三 下午11:36写道: > > > 感谢 一旦 和 Benchao > > > > 1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join 上的数据,但是我 > Job > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。 > > > > val result = bsTableEnv.sqlQuery(""" > >SELECT * > >FROM ( > > SELECT t1.`table`, t1.`database`, t1.transaction_type, > t1.transaction_id, > > t1.reference_id, t1.transaction_sn, t1.merchant_id, > t1.status, t1.event_time > > FROM main_db as t1 > > LEFT JOIN main_db as t2 > > ON t1.reference_id = t2.reference_id > > WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES > >AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES > >) > > """.stripMargin) > > > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到 > > > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source > > subtask的watermark。 > > --- > > 这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog. > > > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为 event > > time,但是有的表又没有这个字段,会导致解析的时候直接报错. > > > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark > > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为. > > > > > > Thanks and best regards > > > > > > Benchao Li 于2020年12月9日周三 上午10:24写道: > > > >> Hi macia, > >> > >> 一旦回答的基本比较完整了。 > >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。 > >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。 > >> > >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source > subtask见到的最大的watermark > >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay > 10个小时,这个已经会导致 > >> 你的没有join到的数据下发会延迟很多了。 > >> > >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。 > >> > >> 赵一旦 于2020年12月9日周三 上午10:15写道: > >> > >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。 > >> > > >> > > >> > > >> > (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left > >> > join。 > >> > > >> > (2)此外,还有一个点,这个我也不确认。如果是datastream > >> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。 > >> > > >> > > >> > input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。 > >> > > >> > macia kk 于2020年12月9日周三 上午1:17写道: > >> > > >> > > @Benchao Li 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到 > >> > > FLink,可能我的Case 太特殊了. > >> > > > >> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 > >> Binlog,我需要 > >> > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 > DB > >> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集. > >> > > > >> > > 还要注意的是 even time 是 create_time, 这里问题非常大: > >> > > 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱 > >> > > 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响 > >> > watermark > >> > > forward on. > >> > > > >> > > bsTableEnv.execute
Re: interval join 时checkpoint失败
反压的话,你可以重点看下你使用的是什么state backend, 如果是filesystem,那状态就是放heap的,这种你需要重点看下gc相关的问题; 如果是rocksdb,这种状态是直接序列化到rocksdb中了,一般很少有内存问题,更多的是IO问题,或者CPU瓶颈。 你可以按照找个思路排查一下。 song wang 于2020年12月10日周四 上午11:38写道: > hi,Benchao, > 是的,任务失败时,右流出现了反压,已经连续两天出现这个问题了,我看下为啥会出现反压,感谢! > > Benchao Li 于2020年12月10日周四 上午11:28写道: > > > 你可以检查下在Checkpoint失败的时候是不是任务已经在反压了, > > 看起来是有可能因为反压导致的Checkpoint超时失败。 > > > > song wang 于2020年12月10日周四 上午10:59写道: > > > > > 各位好, > > > 两个流进行interval join,时间窗口是 > > -23h,+1h,任务可以正常运行23小时左右,之后便报错checkpoint失败,jobmanager > > > log中的报错信息为: > > > > > > 2020-12-10 10:46:51,813 INFO org.apache.flink.runtime.checkpoint. > > > CheckpointCoordinator - Checkpoint 143 of job > > > ee4114a1c5413bd02a68b1165090578e expired before completing. > > > > > > > > > 无其他报错信息,最大checkpoint时间为10min; > > > > > > > > > flink版本:1.9.0 > > > > > > checkpooint配置信息为: > > > > > > env.enableCheckpointing(60); > > > > > > > > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); > > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); > > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > > > > > > > 各位大佬能否给些排查建议呢? > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li
Re: interval join 时checkpoint失败
你可以检查下在Checkpoint失败的时候是不是任务已经在反压了, 看起来是有可能因为反压导致的Checkpoint超时失败。 song wang 于2020年12月10日周四 上午10:59写道: > 各位好, > 两个流进行interval join,时间窗口是 -23h,+1h,任务可以正常运行23小时左右,之后便报错checkpoint失败,jobmanager > log中的报错信息为: > > 2020-12-10 10:46:51,813 INFO org.apache.flink.runtime.checkpoint. > CheckpointCoordinator - Checkpoint 143 of job > ee4114a1c5413bd02a68b1165090578e expired before completing. > > > 无其他报错信息,最大checkpoint时间为10min; > > > flink版本:1.9.0 > > checkpooint配置信息为: > > env.enableCheckpointing(60); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > 各位大佬能否给些排查建议呢? > > > > > > > -- Best, Benchao Li
Re: 关于 stream-stream Interval Join 的问题
Hi macia, 一旦回答的基本比较完整了。 watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。 如果是两侧都有数据,watermark不前进,也都可以正常输出。 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay 10个小时,这个已经会导致 你的没有join到的数据下发会延迟很多了。 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。 赵一旦 于2020年12月9日周三 上午10:15写道: > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。 > > > (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left > join。 > > (2)此外,还有一个点,这个我也不确认。如果是datastream > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。 > > input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。 > > macia kk 于2020年12月9日周三 上午1:17写道: > > > @Benchao Li 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到 > > FLink,可能我的Case 太特殊了. > > > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 Binlog,我需要 > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集. > > > > 还要注意的是 even time 是 create_time, 这里问题非常大: > > 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱 > > 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响 > watermark > > forward on. > > > > bsTableEnv.executeSql(""" > > CREATE TABLE input_database ( > > `table` STRING, > > `database` STRING, > > `data` ROW( > > reference_id STRING, > > transaction_sn STRING, > > transaction_type BIGINT, > > merchant_id BIGINT, > > transaction_id BIGINT, > > status BIGINT > > ), > > ts BIGINT, > > event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)), > > WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR > > ) WITH ( > >'connector.type' = 'kafka', > >'connector.version' = '0.11', > >'connector.topic' = 'mytopic', > >'connector.properties.bootstrap.servers' = '', > >'format.type' = 'json' > > ) > > """) > > > > > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。 > > > > val main_db = bsTableEnv.sqlQuery(""" > > | SELECT * > > | FROM input_database > > | WHERE `database` = 'main_db' > > | AND `table` LIKE 'transaction_tab%' > > | """.stripMargin) > > > > val merchant_db = bsTableEnv.sqlQuery(""" > > | SELECT * > > | FROM input_database > > | WHERE `database` = 'merchant_db' > > | AND `table` LIKE 'transaction_tab%' > > | """.stripMargin) > > > > bsTableEnv.createTemporaryView("main_db", main_db) > > bsTableEnv.createTemporaryView("merchant_db", merchant_db) > > > > val result = bsTableEnv.sqlQuery(""" > >SELECT * > >FROM ( > > SELECT t1.`table`, t1.`database`, t1.transaction_type, > > t1.transaction_id, > > t1.reference_id, t1.transaction_sn, t1.merchant_id, > > t1.status, t1.event_time > > FROM main_db as t1 > > LEFT JOIN merchant_db as t2 > > ON t1.reference_id = t2.reference_id > > WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR > >AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR > >) > > """.stripMargin) > > > > > > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? > > - > > 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 watermark > > 来驱动。 > > 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 join上,就输出 > > join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把. > > > > > > > > > > > > > > Benchao Li 于2020年12月8日周二 下午3:23写道: > > > > > hi macia, > > > > > > 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? > > > > > > macia kk 于2020年12月8日周二 上午1:15写道: > > > > > > > 抱歉,是 >-30 and <+30 > > > > > > > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有 > > > > > > > > 赵一旦 于2020年12月7日 周一23:28写道: > > > > > > > > > 准确点,2个条件之间没and?2个都是>? > > > > > > > > > > macia kk 于2020年12月7日周一 下午10:30写道: > >
Re: 关于 stream-stream Interval Join 的问题
hi macia, 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? macia kk 于2020年12月8日周二 上午1:15写道: > 抱歉,是 >-30 and <+30 > > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有 > > 赵一旦 于2020年12月7日 周一23:28写道: > > > 准确点,2个条件之间没and?2个都是>? > > > > macia kk 于2020年12月7日周一 下午10:30写道: > > > > > 不好意思,我上边贴错了 > > > > > > SELECT * > > > FROM A > > > LEFT OUT JOIN B > > > ON order_id > > > Where A.event_time > B.event_time - 30 s > > > A.event_time > B.event_time + 30 s > > > > > > event_time 是 Time Attributes 设置的 event_time > > > > > > 这样是没有输出的。 > > > > > > > > > > > > interval join 左右表在 state 中是缓存多久的? > > > > > > > > > > > > > > > > > > > > > hailongwang <18868816...@163.com> 于2020年12月7日周一 下午8:05写道: > > > > > > > Hi, > > > > 其中 条件是 > > > > `Where A.event_time < B.event_time + 30 s and A.event_time > > > B.event_time > > > > - 30 s ` 吧 > > > > 可以参考以下例子[1],看下有木有写错。 > > > > [1] > > > > > > > > > > https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 > > > > > > > > > > > > Best, > > > > Hailong > > > > 在 2020-12-07 13:10:02,"macia kk" 写道: > > > > >Hi, 各位大佬 > > > > > > > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的 > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order > > > item > > > > >信息,所以 我用: > > > > > > > > > > SELECT * > > > > > FROM A > > > > > LEFT OUT JOIN B > > > > > ON order_id > > > > > Where A.event_time > B.event_time + 30 s > > > > > A.event_time > B.event_time - 30 s > > > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark > > > > Structural > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题? > > > > > > > > > > -- Best, Benchao Li
Re: flink-json 函数用法
Hi, 目前Flink SQL应该还没有正式支持json函数吧,上面的报错信息看起来也是符合预期的,说的是目前还找不到这个函数。 相关信息可以参考:https://issues.apache.org/jira/browse/FLINK-9477 Yan,Yunpeng(DXM,PB) 于2020年11月30日周一 下午2:18写道: > Flink SQL> select JSON_OBJECT('product_type' VALUE product_type) > > from income_fee > > ; > [ERROR] Could not execute SQL statement. Reason: > org.apache.calcite.sql.validate.SqlValidatorException: No match found for > function signature JSON_OBJECT(, , ) > > Flink SQL> select JSON_OBJECT('product_type' VALUE product_type) > > from sp_income_fee > > where enabled = 1 > > group by id; > [ERROR] Could not execute SQL statement. Reason: > org.apache.calcite.sql.validate.SqlValidatorException: No match found for > function signature JSON_OBJECT(, , ) > > Flink SQL> select JSON_ARRAYAGG(product_type) > > from income_fee > > where f_enabled = 1; > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: Unsupported Function: > 'JSON_ARRAYAGG_ABSENT_ON_NULL' > > 闫云鹏 > DXM 支付业务部 > 地址:北京市海淀区西北旺东路度小满金融总部 > 邮编:100085 > 手机:13693668213 > 邮箱:yanyunp...@duxiaoman.com > > 度小满金融 > 精于科技 值得信赖 > > > > 在 2020/11/30 11:05,“caozhen” 写入: > > 可以把使用方法和 报错信息 发下嘛? > > > > > Yan,Yunpeng(DXM,PB) wrote > > Hi: > > 尝试使用flink-sql将聚合结果json展示的时候发现flink是支持JSON_OBJECTAGG, JSON_ARRAY, > > JSON_OBJECT 等这种函数的(使用的默认的blink), > 但是总是报错函数的用法不对,有相关资料来介绍这些函数的使用方法的吗?或者示例 > > > > 闫云鹏 > > DXM 支付业务部 > > 地址:北京市海淀区西北旺东路度小满金融总部 > > 邮编:100085 > > 手机:13693668213 > > 邮箱: > > > yanyunpeng@ > > > mailto: > > > yanyunpeng@ > > > > > > > 度小满金融 > > > > 精于科技 值得信赖 > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > > -- Best, Benchao Li
Re: Flink SQL Row里嵌套Array该如何用DDL定义?
从这一行代码看出来的: https://github.com/yangyichao-mango/flink-protobuf/blob/616051d74d0973136f931189fd29bd78c0e5/src/main/java/flink/formats/protobuf/ProtobufRowDeserializationSchema.java#L107 现在社区还没有正式支持ProtoBuf Format,不过已经有相关issue和讨论了[1] [1] https://issues.apache.org/jira/browse/FLINK-18202 zilong xiao 于2020年11月24日周二 下午4:46写道: > 这是从哪看出来的呢 求指点,另外如果想用DDL写的schema 应该怎么做呢? > > Benchao Li 于2020年11月24日周二 下午4:33写道: > > > 看起来这个format是用的自动推导schema,而不是用的DDL写的schema。 > > > > zilong xiao 于2020年11月24日周二 下午4:13写道: > > > > > 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧, > > > https://github.com/yangyichao-mango/flink-protobuf > > > > > > Benchao Li 于2020年11月24日周二 下午3:43写道: > > > > > > > 看起来你的DDL写的没有什么问题。 > > > > > > > > 你用的是哪个Flink版本呢? > > > > 此外就是可以发下更完整的异常栈么? > > > > > > > > zilong xiao 于2020年11月24日周二 下午2:54写道: > > > > > > > > > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~ > > > > > > > > > > Benchao Li 于2020年11月24日周二 下午2:49写道: > > > > > > > > > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 > > > > > > > > > > > > zilong xiao 于2020年11月24日周二 上午10:49写道: > > > > > > > > > > > > > [image: image.png] > > > > > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。 > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > Best, > > > > > > Benchao Li > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Best, > > > > Benchao Li > > > > > > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li
Re: Flink SQL Row里嵌套Array该如何用DDL定义?
看起来这个format是用的自动推导schema,而不是用的DDL写的schema。 zilong xiao 于2020年11月24日周二 下午4:13写道: > 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧, > https://github.com/yangyichao-mango/flink-protobuf > > Benchao Li 于2020年11月24日周二 下午3:43写道: > > > 看起来你的DDL写的没有什么问题。 > > > > 你用的是哪个Flink版本呢? > > 此外就是可以发下更完整的异常栈么? > > > > zilong xiao 于2020年11月24日周二 下午2:54写道: > > > > > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~ > > > > > > Benchao Li 于2020年11月24日周二 下午2:49写道: > > > > > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 > > > > > > > > zilong xiao 于2020年11月24日周二 上午10:49写道: > > > > > > > > > [image: image.png] > > > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。 > > > > > > > > > > > > > > > > > -- > > > > > > > > Best, > > > > Benchao Li > > > > > > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li
Re: Flink SQL Row里嵌套Array该如何用DDL定义?
看起来你的DDL写的没有什么问题。 你用的是哪个Flink版本呢? 此外就是可以发下更完整的异常栈么? zilong xiao 于2020年11月24日周二 下午2:54写道: > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~ > > Benchao Li 于2020年11月24日周二 下午2:49写道: > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 > > > > zilong xiao 于2020年11月24日周二 上午10:49写道: > > > > > [image: image.png] > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。 > > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li
Re: Flink SQL Row里嵌套Array该如何用DDL定义?
你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 zilong xiao 于2020年11月24日周二 上午10:49写道: > [image: image.png] > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。 > -- Best, Benchao Li
Re: Flink 1.11 SQL作业中调用UDTF 出现“No match found for function signature ”异常
这个问题已经解决了,在1.11.1版本应该就已经修复了。 可以贴下具体的代码和异常栈,看下是不是还有其他问题,还是使用方式的问题。 tonychen 于2020年10月26日周一 下午6:49写道: > 这个问题解决了吗?现在1.11.2仍然有这个问题,或者有什么临时解决方案, > registerFunction已经不好使了,createTemporarySystemFunction 报错 No match found for > function signature > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
Re: flinksql 不支持 % 运算
1.11的话通过配置是无法实现的。可以把这个pr[1] cherry-pick到1.11的分支上编译一下来实现1.11上使用% [1] https://github.com/apache/flink/pull/12818 夜思流年梦 于2020年10月26日周一 下午4:16写道: > flink 版本1.11 > 目前flink-sql 好像不支持取余运算,会报错: > 比如:SELECT * FROM Orders WHERE a % 2 = 0 > Percent remainder '%' is not allowed under the current SQL conformance > level > > > 看了下flink 的issue ,已经有人碰到过了,说是要1.12版本修复 > > > > > 想问下:如果再1.11版本,flink-sql 要怎么操作才能支持 % 运算呢? 可以通过修改配置文件来实现么?比如flink-conf.yaml -- Best, Benchao Li
Re: 在使用hive catalog的情况下 json format 大小写问题
现在的json format就是支持大小写区分的吧。可以提供下你的DDL和样例数据么? 王刚 于2020年10月21日周三 下午8:08写道: > hi~ 各位大佬, > > 由于catalog是不支持大小写 如果kafka 的数据是json格式,且某些json的key区分大小写的, > > 这个时候建的在hive catalog里建的kafka表的某些json字段是取不到的 > > flink 1.11有现成解决的方案可以解决这种问题不 > -- Best, Benchao Li
Re: Re: flink sql ddl 是否支持映射多层json
嗯,道理是一样的。ROW/MAP/ARRAY这些本来就是嵌套类型,嵌套深度没有限制 Roc Marshal 于2020年10月21日周三 下午2:38写道: > 如果是深度是三层以上也是类似的嵌套语法吗?或者说是其他的写法? > > > 谢谢 > > Best Roc. > > > > > > 在 2020-09-24 20:53:12,"Benchao Li" 写道: > >这个情况现在是支持的,可以用类似于这种写法: > >```SQL > >CREATE TABLE MyTable ( > > a11 INT, > > a12 VARCHAR, > > a13 ROW > >) WITH (...) > >``` > > > >Roc Marshal 于2020年9月24日周四 下午7:54写道: > > > >> 请教个问题,flink sql 流模式链接kafka的时候,message格式是多层的json,怎么对某个深度大于1的字段进行映射呢? > >> { > >> "a11":1, > >> "a12":"1", > >> "a13":{ > >> "a21":1, > >> "a22":1, > >> "a23":"1"} > >> } > >> > >> > >> 比如像这样的格式,怎么将a2开头的字段进行映射呢?如果现有版本不支持这个特性的话,是否可以考虑对此功能进行支持? > >> > >> > >> 谢谢 > > > > > > > >-- > > > >Best, > >Benchao Li > -- Best, Benchao Li
Re: flinkSQL1.11写出数据到jdbc fleld type do not match
Sink > default_catalog.default_database.cloud_behavior_sink do not match. > Query schema: [operation: VARCHAR(2147483647), operation_channel: > VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647), > lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id: > VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei: > VARCHAR(2147483647), targets: ARRAY `value` VARCHAR(2147483647)>>, product_name: VARCHAR(2147483647), > product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647), > platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647), > languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para: > MAP] > Sink schema: [operation: VARCHAR(2147483647), operation_channel: > VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), > lng: > VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id: > VARCHAR(2147483647)] > at > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:100) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204) > at scala.Option.map(Option.scala:146) > at > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) > at > > com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:97) > at > > com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:72) > at > > com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:53) > at > > com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:24) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ... 11 more > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
Re: SQL interval join 问题
Hi Mic, 感谢关注这个issue,这个issue当前还在讨论中。 我认为问题已经定位清楚了,抄送了其他的committer同学进一步讨论确认。 Mic 于2020年10月19日周一 下午3:51写道: > 搜了一下,目前是有一个 issue 看起来相关,https://issues.apache.org/jira/browse/FLINK-18996 > 不知道处理进度如何? > 在 2020-10-19 15:03:54,"Mic" 写道: > >现有 SQL 语句如下: > >create table source1( > > id varchar PRIMARY KEY, > > a varchar, > > proctime AS PROCTIME() > >) with ( > >'connector' = 'kafka' > >... > >); > >create table source2( > > id varchar PRIMARY KEY, > > a varchar, > > proctime AS PROCTIME() > >) with ( > >'connector' = 'kafka' > >... > >); > >select > > case > >when s1.id is not null then s1.id > >else s2.id > > end as ids, > > s1.a, s2.b > >from source1 as s1 full outer join source2 as s2 on s1.id = s2.id where > s1.proctime between s2.proctime - INTERVAL '5' SECOND and s2.proctime + > INTERVAL '5' SECOND; > > > > > >最后的 join 语句预期是 如果两个source的消息, 先后到达时间超过 10 秒,则输出, > 两条消息。 > > > > > >目前的观察结果是,如果两条消息, 先后到达时间超过10 秒,输出为:, > >为何超过 10 秒后,仍然会输出 ? > -- Best, Benchao Li
Re: 回复: flink 自定义udf注册后不能使用
Hi, 当前可以理解Flink注册UDF有三种类型: - TEMPORARY SYSTEM FUNCTION - TEMPORARY CATALOG FUNCTION - CATALOG FUNCTION 加上内置的SYSTEM FUNCTION 可以认为一共有四种,他们的解析顺序为: 1. TEMPORARY SYSTEM FUNCTION 2. SYSTEM FUNCTION 3. TEMPORARY CATALOG FUNCTION 4. CATALOG FUNCTION 所以你观察到TEMPORARY SYSTEM FUNCTION会覆盖内置函数,但是TEMPORARY CATALOG FUNCTION不会覆盖 这个现象是没有问题的。 amen...@163.com 于2020年10月16日周五 下午3:46写道: > 是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug > > best, > amenhub > > 发件人: 史 正超 > 发送时间: 2020-10-16 15:26 > 收件人: user-zh@flink.apache.org > 主题: 回复: 回复:回复: flink 自定义udf注册后不能使用 > 你这样创建试一下,或者换个名字试试 > > CREATE TEMPORARY SYSTEM FUNCTION imei_encrypt AS > 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA; > > 我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY > SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以 > > > 发件人: 奔跑的小飞袁 > 发送时间: 2020年10月16日 6:47 > 收件人: user-zh@flink.apache.org > 主题: Re: 回复:回复: flink 自定义udf注册后不能使用 > > 是的,是我传参有问题 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
Re: flinksql如何控制结果输出的频率
可以具体描述下你的问题么,没太看懂你的问题。 smallwong 于2020年10月14日周三 下午6:57写道: > 哈喽,请问是做了什么调整?才10秒的窗口,期待每秒都输出结果的 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, Benchao Li
Re: flink-SQL1.11版本对map类型中value的空指针异常
是的,所以应该用createNullableConverter,而不是createConverter 史 正超 于2020年10月14日周三 上午10:45写道: > > 但是 方法上有这样的一个注释:Creates a runtime converter which assuming input object is > not null. > 代码这样写的前提是,不允许对象的值为null的。 > ____ > 发件人: Benchao Li > 发送时间: 2020年10月14日 2:34 > 收件人: user-zh > 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常 > > 嗯,这应该是一个实现的bug,可以提个issue修复一下~ > > 史 正超 于2020年10月14日周三 上午10:19写道: > > > 从你的异常来看,你用的format是 avro, 我看了下源码,他对varchar类型的covert和json不一样,avro的代码是这样的: > > > > case CHAR: > > case VARCHAR: > >return avroObject -> StringData.fromString(avroObject.toString()); > > > > 所以,你的map类型的value值为null,会报空指针异常的。 > > > > 发件人: 奔跑的小飞袁 > > 发送时间: 2020年10月14日 1:46 > > 收件人: user-zh@flink.apache.org > > 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常 > > > > other_para MAP > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > > > -- > > Best, > Benchao Li > -- Best, Benchao Li
Re: flink-SQL1.11版本对map类型中value的空指针异常
嗯,这应该是一个实现的bug,可以提个issue修复一下~ 史 正超 于2020年10月14日周三 上午10:19写道: > 从你的异常来看,你用的format是 avro, 我看了下源码,他对varchar类型的covert和json不一样,avro的代码是这样的: > > case CHAR: > case VARCHAR: >return avroObject -> StringData.fromString(avroObject.toString()); > > 所以,你的map类型的value值为null,会报空指针异常的。 > > 发件人: 奔跑的小飞袁 > 发送时间: 2020年10月14日 1:46 > 收件人: user-zh@flink.apache.org > 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常 > > other_para MAP > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
Re: flink-SQL1.11版本对map类型中value的空指针异常
ro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > Caused by: java.lang.NullPointerException > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:253) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:222) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:207) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:149) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > private static DeserializationRuntimeConverter > createMapConverter(LogicalType type) { > final DeserializationRuntimeConverter keyConverter = > createConverter( > DataTypes.STRING().getLogicalType()); > final DeserializationRuntimeConverter valueConverter = > createConverter( > extractValueTypeToAvroMap(type)); > return avroObject -> { > final Map map = (Map) avroObject; > Map result = new HashMap<>(); > for (Map.Entry entry : map.entrySet()) { > Object key = > keyConverter.convert(entry.getKey()); > Object value = > valueConverter.convert(entry.getValue()); > result.put(key, value); > } > return new GenericMapData(result); > }; > } > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
Re: Flink的table-api不支持.
user是关键字,需要用`user`来处理一下~ Kyle Zhang 于2020年10月9日周五 上午8:34写道: > 试一试select * from OrderA orderA join OrderB orderB on > orderA.user=orderB.user > > On Sun, Oct 4, 2020 at 5:09 PM 忝忝向仧 <153488...@qq.com> wrote: > > > Hi,all: > > > > > > Table api的sql查询里面join的时候不能写"."么? > > 这样写就会报错 如下 > > Exception in thread "main" org.apache.flink.table.api.SqlParserException: > > SQL parse failed. Encountered "." at line 1, column 36. > > Was expecting one of: > >> "EXCEPT" ... > > "FETCH" ... > > "FROM" ... > > "INTERSECT" ... > > "LIMIT" ... > > "OFFSET" ... > > "ORDER" ... > > "MINUS" ... > > "UNION" ... > > "," ... > > > > > > > > Table result = tEnv.sqlQuery("select * from OrderA join OrderB on > > OrderA.user=OrderB.user"); > -- Best, Benchao Li
Re: Re: sql-cli执行sql报错
这个错误看起来比较奇怪。正常来讲flink-sql-connector-kafka_2.11-1.10.2.jar里面应该都是shaded之后的class了, 但是却报了一个非shaded的ByteArrayDeserializer。 我感觉这个应该是你自己添加了一下比较特殊的逻辑导致的。可以介绍下你对kafka connector做了哪些改造么? hl9...@126.com 于2020年9月28日周一 下午6:06写道: > 按照您的方法重试了下,又报了另一个错误: > Flink SQL> CREATE TABLE tx ( > > account_id BIGINT, > > amount BIGINT, > > transaction_time TIMESTAMP(3), > > WATERMARK FOR transaction_time AS transaction_time - > INTERVAL '5' SECOND > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = 'universal', > > 'connector.topic' = 'heli01', > > 'connector.properties.group.id' = 'heli-test', > > 'connector.properties.bootstrap.servers' = ' > 10.100.51.56:9092', > > 'connector.startup-mode' = 'earliest-offset', > > 'format.type'= 'csv' > > ); > [INFO] Table has been created. > > Flink SQL> show tables ; > tx > > Flink SQL> select * from tx ; > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException: > org.apache.kafka.common.serialization.ByteArrayDeserializer is not an > instance of > org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer > > 附:lib包清单 > [test@rcx51101 lib]$ pwd > /opt/flink-1.10.2/lib > > flink-csv-1.10.2.jar > flink-dist_2.12-1.10.2.jar > flink-jdbc_2.12-1.10.2.jar > flink-json-1.10.2.jar > flink-shaded-hadoop-2-uber-2.6.5-10.0.jar > flink-sql-connector-kafka_2.11-1.10.2.jar > flink-table_2.12-1.10.2.jar > flink-table-blink_2.12-1.10.2.jar > log4j-1.2.17.jar > mysql-connector-java-5.1.48.jar > slf4j-log4j12-1.7.15.jar > > > > > hl9...@126.com > > 发件人: Leonard Xu > 发送时间: 2020-09-28 16:36 > 收件人: user-zh > 主题: Re: sql-cli执行sql报错 > Hi > benchao的回复是的对的, > 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包 > flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。 > > > > 相关lib包: > > flink-connector-kafka_2.12-1.10.2.jar > > kafka-clients-0.11.0.3.jar > > 祝好 > Leonard > -- Best, Benchao Li
Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出
这个问题的原因应该是你的kafka partition数量应该是大于1的,并且不是所有partition都有数据导致的。 你可以检查下你的kafka topic。 目前来讲,只要你的每个kafka 的partition都有数据,那么watermark应该是可以正常产生的。跟并行度无关。 Asahi Lee <978466...@qq.com> 于2020年9月27日周日 下午6:05写道: > 你好! > 我使用flink > sql,从kafka中读取数据,然后进行sql聚合操作,然后再输出到kafka中;当我设置并行度为1时,程序执行正常;当我设置并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢? > 使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度? -- Best, Benchao Li
Re: 使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。
你的timeout方法应该要正确的处理ResultFuture, 比如ResultFuture.complete或者completeExceptionally,如果你什么都没做,那么这个异步请求就还没有真的结束。 王敏超 于2020年9月29日周二 下午5:43写道: > AsyncDataStream > //顺序异步IO > .orderedWait(input, new AsyncDatabaseRequest(), 5000, > TimeUnit.MILLISECONDS, 1000) > > 当我没重写timeout方法的时候,会执行这个报错信息 > resultFuture.completeExceptionally(new TimeoutException("Async function > call > has timed out.")) > > > 当我重写了timeout方法,如下,程序就卡住了,求大佬解答。 > override def timeout(input: String, resultFuture: ResultFuture[Int]): > Unit > = { > println("time out ... ") > } > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
Re: 回复: BLinkPlanner sql join状态清理
Hi Ericliuk, 这应该是实现的bug,你可以去社区建一个issue描述下这个问题。 有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~ Ericliuk 于2020年9月29日周二 下午4:59写道: > 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。 > < > http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png> > > > 不太清楚为什么用了mini batch就没读取这个配置。 > 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, Benchao Li
Re: 如何在流式数据源上使用分析函数LAG和EAD函数
Hi Robin, 目前LAG/LEAD函数在流式场景下的实现的确是有bug的,那个实现只能在批式场景下work, 是线上其实没有考虑流式的场景。所以你看到的结果应该是它只能返回当前数据。 这个问题我也是最近才发现的,刚刚建了一个issue[1] 来跟踪这个问题。 当前如果你想实现类似功能,可以先自己写一个udaf来做。 [1] https://issues.apache.org/jira/browse/FLINK-19449 Robin Zhang 于2020年9月29日周二 下午2:04写道: > 环境: flink 1.10,使用flinkSQL > > kafka输入数据如: > {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0} > {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0} > {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0} > {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0} > {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0} > {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0} > > sql如下: > > INSERT INTO topic_sink > SELECT > t, > id, > speed, > LAG(speed, 1) OVER w AS speed_1, > LAG(speed, 2) OVER w AS speed_2 > FROM topic_source > WINDOW w AS ( > PARTITION BY id > ORDER BY t > ) > 我期望得到的结果数据是 > {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null, > "speed_2":null} > {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0, > "speed_2":null} > {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0, > "speed_2":1.0} > {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0, > "speed_2":2.0} > {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0, > "speed_2":3.0} > {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0, > "speed_2":4.0} > > 实际得到的结果数据是: > {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0, > "speed_2":1.0} > {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0, > "speed_2":2.0} > {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0, > "speed_2":3.0} > {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0, > "speed_2":4.0} > {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0, > "speed_2":5.0} > {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0, > "speed_2":6.0} > > 想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
Re: sql-cli执行sql报错
(1) 的方式相当于一个shade之后的包,会把所有compile的依赖都打进去。 (2) 的方式的话,你需要自己手工添加所有这个connector的依赖,比如你提到的kafka-clients。 而且,kafka-clients本身的依赖如果你没有打到kafka-clients这个包里面的话,那你也需要把 那些compile依赖也都放进来。所以相当于手工做了一遍maven的依赖处理,而且要想全部都 放进来,应该会有很多。 如果你对kafka-clients有修改,建议自己重新依赖自己修改后的kafka-clients打包一个kafka-sql-connector-kafka 赵一旦 于2020年9月28日周一 下午5:51写道: > > 看了下pom,在flink-sql-connector-kafka中依赖了flink-connector-kafka-**,该包又依赖了flink-connector-kafka-base-**以及kafka-client。 > 然后flink-sql-connector-kafka做了shade。 > > 所以看下来,我的那个(1)和(2)理论上效果是一样的。 > > > 顺便讲下,我kafka-clients更换了ssl证书读取方式,用于支持hdfs等分布式协议(直接复用了flink-core中的filesystem实现)。 > > 赵一旦 于2020年9月28日周一 下午5:42写道: > > > > 这个不是很懂,(1)flink-connector-kafka_2.11-1.11.2.jar+flink-connector-kafka-base_2.11-1.11.2.jar+kafka-clients-0.11.0.3.jar > > 和(2)flink-sql-connector-kafka**.jar是啥区别呢? > > > > 使用(1)可以不?因为我的kafka-clients部分是调整了源码的。 > > > > Leonard Xu 于2020年9月28日周一 下午4:36写道: > > > >> Hi > >> benchao的回复是的对的, > >> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包 > >> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。 > >> > >> > >> > 相关lib包: > >> > flink-connector-kafka_2.12-1.10.2.jar > >> > kafka-clients-0.11.0.3.jar > >> > >> 祝好 > >> Leonard > > > > > -- Best, Benchao Li
Re: 回复:sql-cli执行sql报错
kafka的依赖应该是依赖shaded之后的版本,也就是flink-*sql*-connector-kafka***.jar hl9...@126.com 于2020年9月28日周一 上午10:29写道: > 确实语法不对。我用了1.10的语法后,执行sql又报了另外一个错误: > Flink SQL> select * from tx ; > [ERROR] Could not execute SQL statement. Reason: > java.lang.ClassNotFoundException: > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer > > 相关的lib依赖包如下: > [root@rcx51101 lib]# pwd > /opt/flink-1.10.2/lib > [root@rcx51101 lib]# ll | grep kafka > -rw-rw-r-- 1 test test 26169 Sep 28 10:21 > flink-connector-kafka-0.10_2.11-1.10.2.jar > -rw-rw-r-- 1 test test 54969 Sep 28 10:21 > flink-connector-kafka-0.11_2.11-1.10.2.jar > -rw-rw-r-- 1 test test 37642 Sep 28 10:21 > flink-connector-kafka-0.9_2.11-1.10.2.jar > -rw-rw-r-- 1 test test 81912 Aug 17 16:41 > flink-connector-kafka_2.12-1.10.2.jar > -rw-rw-r-- 1 test test106632 Sep 28 10:22 > flink-connector-kafka-base_2.11-1.10.2.jar > -rw-rw-r-- 1 test test106632 Aug 17 16:36 > flink-connector-kafka-base_2.12-1.10.2.jar > -rw-rw-r-- 1 test test 1893564 Jul 24 2018 kafka-clients-2.0.0.jar > > > > hl9...@126.com > > 发件人: 111 > 发送时间: 2020-09-28 09:23 > 收件人: user-zh@flink.apache.org > 主题: 回复:sql-cli执行sql报错 > 你貌似使用的是flink-1.11的语法。 > 可以修改成flink-1.10的语法试试,参考文档: > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector > > > | | > xinghalo > | > | > xingh...@163.com > | > 签名由网易邮箱大师定制 > > > 在2020年09月28日 09:16,hl9...@126.com 写道: > flink版本1.10.2,问题重现如下,请问各位大佬是什么原因: > > ./sql-client.sh embedded > Flink SQL> show tables ; > [INFO] Result was empty. > > Flink SQL> CREATE TABLE tx ( > account_id BIGINT, > amount BIGINT, > transaction_time TIMESTAMP(3), > WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'heli01', > 'properties.bootstrap.servers' = '10.100.51.56:9092', > 'format'= 'csv' > ); > [INFO] Table has been created. > > Flink SQL> select * from tx ; > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find > a suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' in > the classpath. > > Reason: Required context properties mismatch. > > The following properties are requested: > connector=kafka > format=csv > properties.bootstrap.servers=10.100.51.56:9092 > schema.0.data-type=BIGINT > schema.0.name=account_id > schema.1.data-type=BIGINT > schema.1.name=amount > schema.2.data-type=TIMESTAMP(3) > schema.2.name=transaction_time > schema.watermark.0.rowtime=transaction_time > schema.watermark.0.strategy.data-type=TIMESTAMP(3) > schema.watermark.0.strategy.expr=`transaction_time` - INTERVAL '5' SECOND > topic=heli01 > > The following factories have been considered: > org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > > > > hl9...@126.com > -- Best, Benchao Li
Re: Flink SQL 1.11.1 executeSql/SqlUpdate时 SQL validation的一些问题
我理解这个是calcite的机制导致的。calcite并不知道Flink一共有多少层schema,其实Flink 自己抽象了三层,也就是catalog.database.table 但是配置CalciteCatalogReader的时候,需要配置一些默认的schema查找规则,这个Flink是配置了两个, 也就是默认的catalog 和 默认的catalog+默认的database 然后calcite在查找的时候会先尝试default_catalog.default_database作为schema,去查找a.b,此时会先把a当做table去查找,并且找不到。 接下来会default_catalog作为schema去查找a.b,此时就找到了。 刘首维 于2020年9月25日周五 下午3:41写道: > Hi all, > > > 今天在调试1.11 Flink 代码的时候,发现一个没太理解的现象 > > > 考虑以下code > > > > bsTableEnv.executeSql("create database a") > bsTableEnv.executeSql( " CREATE TABLE a.b "(后略)) > bsTableEnv.executeSql("select * from a.b") > > > 然后发现了以下现象: > > > 从图中可以得知,在`DatabaseCalciteSchema` 中 > 我发现下面几个奇怪的点 > >1. databaseName 是 ‘'default' >2. getTable将 `a`作为参数传入,而不是b (a是库名,b是表名) > > > > 首先可以确定的是这个发生在validation阶段 > > 其次我发现特意针对这块做了一次catch `TableNotExistException`的操作 > > 请问这部分代码的用途和目的是? > > > > -- Best, Benchao Li
Re: 回复:flink sql count问题
Hi, 试试用这种方式呢:count(1) filter (where name like '南京%') anonnius 于2020年9月27日周日 下午5:29写道: > select count(nullif(if(name not like '南京%', '其他', '南京'), '其他')) > > > > > > > > > > > > > > > > > > 在 2020-09-27 17:23:07,"zya" 写道: > >你好,链接无法显示,能麻烦再贴下吗 > > > > > >--原始邮件-- > >发件人: > "user-zh" > < > anonn...@126.com; > >发送时间:2020年9月27日(星期天) 下午5:20 > >收件人:"user-zh" > > >主题:Re:回复:flink sql count问题 > > > > > > > >hi: '其他', '南京'), '其他')) > >在 2020-09-27 17:07:39,"zya" >貌似只能这样了,感谢回答 > > > > > > > > > >--nbsp;原始邮件nbsp;-- > >发件人: > "user-zh" > >发送时间:nbsp;2020年9月27日(星期天) 下午5:03 > >收件人:nbsp;"user-zh" > > >主题:nbsp;Re:回复:flink sql count问题 > > > > > > > >你count 也会生成记录啊。 你过滤掉就行nbsp;nbsp; 。 比如 having xxxnbsp; > ,或者加个filter > >在 2020-09-27 17:01:06,"zya" >gt;这个是我现在的做法,但是的问题就是使用sum会在条件没满足时也会在mysql中生成一条记录 > >gt;amp;nbsp; > >gt; > >gt; > >gt; > >gt; > > >gt;--amp;nbsp;原始邮件amp;nbsp;-- > >gt;发件人:nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; > "user-zh"nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; > >gt;发送时间:amp;nbsp;2020年9月27日(星期天) 下午4:59 > >gt;收件人:amp;nbsp;"user-zh" amp;gt;; > >gt; > >gt;主题:amp;nbsp;Re:flink sql count问题 > >gt; > >gt; > >gt; > >gt;最好把null 变成0,amp;nbsp; 你这样amp;nbsp;amp;nbsp; > sum(if(name like '南京%',1 , 0)) > >gt;在 2020-09-27 16:53:56,"zya" 写道: > >gt;amp;gt;请教各位: > >gt;amp;gt;我有一个sql任务需要进行count,在count中有一个表达式,只想count符合条件的记录, > >gt;amp;gt;之前在hive中是这么写的:count(if(name like '南京%',1 , > null)),但是flink sql中count不能为null,有什么别的方法能实现该功能吗? > >gt;amp;gt;使用的是flink1.10.1 blink > >gt;amp;gt;amp;amp;nbsp; > -- Best, Benchao Li
Re: Re: FlinkSQL是否支持设置窗口trigger实现continuious trigger呢
可以的。有一个实验性质的Fast Emit功能,可以通过如下参数开启: table.exec.emit.early-fire.enabled = true table.exec.emit.early-fire.delay = 10s Michael Ran 于2020年9月27日周日 下午2:49写道: > 额,不是5分钟窗口,10秒一个步长往前滑动吗? 我以为滚动是5分钟窗口 5分钟一输出呢。。 > 在 2020-09-27 14:43:57,"赵一旦" 写道: > >不是滑动窗口哈。是滚动窗口,每10秒触发一次输出。滑动窗口的化逻辑就变了。 > > > >Michael Ran 于2020年9月27日周日 下午2:39写道: > > > >> 滑动窗口 > >> 在 2020-09-27 13:25:37,"赵一旦" 写道: > >> >如题,不使用DatastreamAPI,使用FlinkSQL能否实现五分钟窗口,每10秒输出一次呢? > >> > -- Best, Benchao Li
Re: 关于flink sql的数据类型
1.11的话, string类型是允许:"a":"abc" 和 "a": 123这两种形式的 bigint类型的话:"a": 123 和 "a": "123"也都是合法的 默认如果是字段不存在,会用null来表示; 如果字段解析错误,会抛异常,如果配置了ignoreParseError,则会忽略整条数据。 不知道你上面提到的(1)是怎么测出来的,方便把具体的DDL定义和示例数据贴一下吗? 赵一旦 于2020年9月25日周五 下午2:52写道: > 我基于1.11测试的。目前来看,json format的2个设置都设置好。然后event > time部分使用COALESCE将null情况设置为event_time 0。这么做是最好的情况啦。 > > Benchao Li 于2020年9月25日周五 下午2:08写道: > > > 你用的是哪个版本?1.11版本应该是改进过这块,不应该出现这个情况。 > > > > 赵一旦 于2020年9月25日周五 上午11:02写道: > > > > > 而且按照string无法接受"a":a,bigint在 "t":"as"情况下会为null。 > > > 这么来看,bigint反而比string还通用了,可以将非法数据通过null录入进来。 > > > string方式反而丢失部分信息了还。 > > > > > > 赵一旦 于2020年9月25日周五 上午10:57写道: > > > > > > > 今天做个测试,发现一些类型的特点,想确认下。 > > > > > > > > 目前来看,kafka数据的2个配置,(1)不存在字段设置null(2)解析错误忽略。 > > > > > > > > > > > > 发现如下几个特征 > > > > (1)顶层字段字符串情况,实际数据为 "a": "a" 合法,"a":12不合法。 > > > > (2)非顶层字段,比如d map,d中的字段 "b": 12则是合法的。 > > > > (3)t字段为bigint类型,并且由此衍生了eventtime。 > > > > 如果数据为 t: abc 则数据直接非法被忽略。 > > > > 如果数据为t: "abc",则t被转为null? > > > > 当然eventtime本身还有个不可null的限制,我通过COALESCE解决了。 > > > > > > > > > > > > 想知道有没有什么规则,尽可能避免任务失败的。因为数据一旦有一点异常导致失败就会很麻烦。 > > > > > > > > 比如那个忽略错误,实际是无法解决event time为null的情况的这种错误的。 > > > > 我是通过COALESCE解决的。 > > > > > > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li
Re: 关于flink sql的数据类型
你用的是哪个版本?1.11版本应该是改进过这块,不应该出现这个情况。 赵一旦 于2020年9月25日周五 上午11:02写道: > 而且按照string无法接受"a":a,bigint在 "t":"as"情况下会为null。 > 这么来看,bigint反而比string还通用了,可以将非法数据通过null录入进来。 > string方式反而丢失部分信息了还。 > > 赵一旦 于2020年9月25日周五 上午10:57写道: > > > 今天做个测试,发现一些类型的特点,想确认下。 > > > > 目前来看,kafka数据的2个配置,(1)不存在字段设置null(2)解析错误忽略。 > > > > > > 发现如下几个特征 > > (1)顶层字段字符串情况,实际数据为 "a": "a" 合法,"a":12不合法。 > > (2)非顶层字段,比如d map,d中的字段 "b": 12则是合法的。 > > (3)t字段为bigint类型,并且由此衍生了eventtime。 > > 如果数据为 t: abc 则数据直接非法被忽略。 > > 如果数据为t: "abc",则t被转为null? > > 当然eventtime本身还有个不可null的限制,我通过COALESCE解决了。 > > > > > > 想知道有没有什么规则,尽可能避免任务失败的。因为数据一旦有一点异常导致失败就会很麻烦。 > > > > 比如那个忽略错误,实际是无法解决event time为null的情况的这种错误的。 > > 我是通过COALESCE解决的。 > > > -- Best, Benchao Li
Re: flink sql ddl 是否支持映射多层json
这个情况现在是支持的,可以用类似于这种写法: ```SQL CREATE TABLE MyTable ( a11 INT, a12 VARCHAR, a13 ROW ) WITH (...) ``` Roc Marshal 于2020年9月24日周四 下午7:54写道: > 请教个问题,flink sql 流模式链接kafka的时候,message格式是多层的json,怎么对某个深度大于1的字段进行映射呢? > { > "a11":1, > "a12":"1", > "a13":{ > "a21":1, > "a22":1, > "a23":"1"} > } > > > 比如像这样的格式,怎么将a2开头的字段进行映射呢?如果现有版本不支持这个特性的话,是否可以考虑对此功能进行支持? > > > 谢谢 -- Best, Benchao Li
Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以
我们一般提升作业吞吐能力的步骤就是看作业的反压情况, - 如果作业完全没有反压,说明此时处理能力大于上游数据产生速度 - 如果作业有反压,就具体看下反压的是哪个算子,存在什么瓶颈。比如网络IO、磁盘IO、CPU; 当然,有时候内存问题也会表现为CPU现象,比如GC比较严重 范超 于2020年9月24日周四 上午10:48写道: > 谢谢Benchao哥回复。 > > 这几天一直忙着压测这个问题。 > 经多轮压测(先灌满kafka数据),再去消费。 > 发现确实是您说的问题中的第三个情况 > 由于kafka的topic只开了一个partition > > 所以flinkkafkaconsumer按照一个taskmanger对应了一个kafka的parition的方式进行了处理。从而导致虽然作业并发度够大,但是由于只有一个partition, > 其他并发的taskmanager无法获取到更多的partition进行消费,从而导致并行度提升而作业消费能力却无法同比增大。 > > 之后通过建立2个partition的topic,实现了消费能力的翻倍。 > > > 想再请多问您一句,我如果想压出作业的极限吞吐量,请问该如何设置一些运行参数,目前我通过设置on yarn > 的tm的内存大小,kafka的partition数目,也无法将作业的吞吐量压上去。 > > > > -邮件原件- > 发件人: Benchao Li [mailto:libenc...@apache.org] > 发送时间: 2020年9月18日 星期五 18:49 > 收件人: user-zh > 主题: Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以 > > 提交两个作业的话,两个作业是完全独立的,都会消费全量数据。 > > 一个作业的消费能力不行,可以具体看下瓶颈在哪里,比如: > 1. 作业是否有lag,如果没有lag,那其实是没有问题的 > 2. 如果作业有lag,而且lag还在上涨,说明当前消费能力不足,此时可以看下作业具体的瓶颈在哪里 > 有可能是某个算子在反压导致整个作业的消费能力不足 > 也有可能是作业的整体CPU资源不足导致的 > 也有一种极端情况是,作业的并发度已经足够大,source subtask已经对应一个kafka > partition了,但是消费能力还是不足,这个时候其实是单个partition数据量太大,对应到Flink的source算子处理能力不足导致的 > 3. 如果作业当前有lag,但是lag在下降,说明消费能力其实是够的,只是数据有些积压 > > 范超 于2020年9月18日周五 下午4:07写道: > > > 各位好,我遇到了一个奇怪的问题 > > > > 我是使用flink1.10和 flink-connector-kafka_2.11 > > > > 使用Flink on yarn 模式运行,无论怎么调大并行度。Kafka节点(我使用的单节点)的网卡输出速度一直上不去。 > > > > 但是提交两个同样的应用同样使用FLink on Yarm模式,Kafka节点的网卡输出速度是正常翻倍的。 > > > > 我想达到的目的不是通过多向yarn集群提交多一个app,而是通过设置并行度来提高应用的吞吐量。。 > > > > 求各位大佬指导 > > > > > -- > > Best, > Benchao Li > -- Best, Benchao Li
Re: flink sql延迟数据
这个目前还不能,但是在1.12是可以的,已经在这个issue[1] 中添加了这个功能 [1] https://issues.apache.org/jira/browse/FLINK-18555 ang <806040...@qq.com> 于2020年9月24日周四 上午11:19写道: > 感谢benchao,请问下这部分只能通过config来设置吗,有没有可以直接在sql中设置的配置项 > > > > > > > > --原始邮件-- > 发件人: > "user-zh" > < > libenc...@apache.org; > 发送时间:2020年9月23日(星期三) 下午5:22 > 收件人:"user-zh" > 主题:Re: flink sql延迟数据 > > > > 你是用的Blink planner的TUMBLE window么,如果是的话,可以通过设置state > retention[1]时间来处理late数据的。 > 具体的allow lateness的时间就是你设置的min retention time > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time > > ang <806040...@qq.com 于2020年9月23日周三 下午4:24写道: > > hi各位,有个问题请教一下: > 我现在使用flink sql统计一下kafka中在某个时间窗口内指定字段出现的次数,使用event > time,需要在5s内输出结果,但是数据会有一些延迟,可能大于5s,目前设置waterwark为 > WATERMARK FOR ts AS tsnbsp; - INTERVAL '5' SECODND > ,但是这样延迟大于5s的数据就会被丢弃掉,请问下其他延迟的数据有没有什么办法进行处理?我看datastream > api里面可以使用allowed > lateness,但是这部分在sql中没看到有相关语法 > > > Flink版本1.10.1 > nbsp; > > > > -- > > Best, > Benchao Li -- Best, Benchao Li
Re: [DISCUSS] Move Hive document to "Table & SQL Connectors" from "Table API & SQL"
+1 nashcen <2415370...@qq.com> 于2020年9月24日周四 下午1:09写道: > +1 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
Re: flink sql延迟数据
你是用的Blink planner的TUMBLE window么,如果是的话,可以通过设置state retention[1]时间来处理late数据的。 具体的allow lateness的时间就是你设置的min retention time [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time ang <806040...@qq.com> 于2020年9月23日周三 下午4:24写道: > hi各位,有个问题请教一下: > 我现在使用flink sql统计一下kafka中在某个时间窗口内指定字段出现的次数,使用event > time,需要在5s内输出结果,但是数据会有一些延迟,可能大于5s,目前设置waterwark为 > WATERMARK FOR ts AS ts - INTERVAL '5' SECODND > ,但是这样延迟大于5s的数据就会被丢弃掉,请问下其他延迟的数据有没有什么办法进行处理?我看datastream api里面可以使用allowed > lateness,但是这部分在sql中没看到有相关语法 > > > Flink版本1.10.1 > -- Best, Benchao Li
Re: flink pb转json性能问题
Hi kandy, 关于第1个问题,目前社区有计划做一个内置的pb format[1],可能大概率赶不上1.12了,不过应该1.13差不多。 [1] https://issues.apache.org/jira/browse/FLINK-18202 kandy.wang 于2020年9月23日周三 下午4:55写道: > 因flink目前不支持pb format,调用了,protobuf-java-util > com.google.protobuf.utilJsonFormat.printer().preservingProtoFieldNames().print(message) > 先再pb 转成json 再套用 JsonRowDataDeserializationSchema处理json, > 发现处理的性能就只能达到20w左右的tps,而如果是处理json格式的数据,tps是可以达到50-60w的tps. > 想问一下,1、flink要是处理pb格式的数据,有什么好的办法? 2 > 、社区对pb format 会支持么? > 3、pb转json 有什么性能比较好的工具包 -- Best, Benchao Li
Re: [flink-1.10.2] Blink SQL 超用内存严重
超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的, 只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。 郑斌斌 于2020年9月23日周三 下午12:29写道: > 我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN > KILL 。 > 单流跑的话,比较正常。 > JOB的内存是4G。版本1.11.1 > ------ > 发件人:Benchao Li > 发送时间:2020年9月23日(星期三) 10:50 > 收件人:user-zh > 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重 > > Hi Tianwang, > > 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加 > > 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游 > join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是 > `Math.max(leftRelativeSize, rightRelativeSize) + > allowedLateness`,根据你的SQL,这个值应该是6h > 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的 > 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到 > 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是 > `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / > 2;`,在你的SQL来讲,就是3h,也就是说 > 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1] > > 希望这个可以解答你的疑惑~ > > [1] https://issues.apache.org/jira/browse/FLINK-18996 > > Tianwang Li 于2020年9月22日周二 下午8:26写道: > > > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。 > > > > > > 【join】 > > > > > SELECT `b`.`rowtime`, > > > `a`.`c_id`, > > > `b`.`openid` > > > FROM `test_table_a` AS `a` > > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID` > > > AND `a`.`openid` = `b`.`openid` > > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' > SECOND > > > AND `a`.`rowtime` + INTERVAL '6' HOUR > > > > > > > > 【window】 > > > > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) > AS > > > `rowtime`, > > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > > `__windoow_start__`, > > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > > `__window_end__`, > > > `c_id`, > > > COUNT(`openid`) AS `cnt` > > > FROM `test_table_in_6h` > > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR), > > > `c_id` > > > > > > > > > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。 > > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右 > > > > 【配置】 > > > > > cat conf/flink-conf.yaml > > > jobmanager.rpc.address: flink-jobmanager > > > taskmanager.numberOfTaskSlots: 1 > > > blob.server.port: 6124 > > > jobmanager.rpc.port: 6123 > > > taskmanager.rpc.port: 6122 > > > jobmanager.heap.size: 6144m > > > taskmanager.memory.process.size: 4g > > > taskmanager.memory.jvm-overhead.min: 1024m > > > taskmanager.memory.jvm-overhead.max: 2048m > > > taskmanager.debug.memory.log-interval: 1 > > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log > > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails > > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation > > -XX:NumberOfGCLogFiles=10 > > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause" > > > > > > > > > > > -- > > ** > > tivanli > > ** > > > > > -- > > Best, > Benchao Li > > -- Best, Benchao Li
Re: [flink-1.10.2] Blink SQL 超用内存严重
Hi Tianwang, 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游 join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是 `Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness`,根据你的SQL,这个值应该是6h 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是 `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / 2;`,在你的SQL来讲,就是3h,也就是说 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1] 希望这个可以解答你的疑惑~ [1] https://issues.apache.org/jira/browse/FLINK-18996 Tianwang Li 于2020年9月22日周二 下午8:26写道: > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。 > > > 【join】 > > > SELECT `b`.`rowtime`, > > `a`.`c_id`, > > `b`.`openid` > > FROM `test_table_a` AS `a` > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID` > > AND `a`.`openid` = `b`.`openid` > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0' SECOND > > AND `a`.`rowtime` + INTERVAL '6' HOUR > > > > > 【window】 > > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > `rowtime`, > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > `__windoow_start__`, > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS > > `__window_end__`, > > `c_id`, > > COUNT(`openid`) AS `cnt` > > FROM `test_table_in_6h` > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR), > > `c_id` > > > > > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。 > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右 > > 【配置】 > > > cat conf/flink-conf.yaml > > jobmanager.rpc.address: flink-jobmanager > > taskmanager.numberOfTaskSlots: 1 > > blob.server.port: 6124 > > jobmanager.rpc.port: 6123 > > taskmanager.rpc.port: 6122 > > jobmanager.heap.size: 6144m > > taskmanager.memory.process.size: 4g > > taskmanager.memory.jvm-overhead.min: 1024m > > taskmanager.memory.jvm-overhead.max: 2048m > > taskmanager.debug.memory.log-interval: 1 > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation > -XX:NumberOfGCLogFiles=10 > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause" > > > > > > -- > ** > tivanli > ** > -- Best, Benchao Li
Re: 消费kafka source反压
这个性能影响指的是跟那种情况进行对比呢? smq <374060...@qq.com> 于2020年9月21日周一 下午6:49写道: > 谢谢,多问一句,并行度为1的话,keyby算子加上keydstate对性能影响大吗 > > > > ---原始邮件--- > 发件人: "Benchao Li" 发送时间: 2020年9月21日(周一) 下午4:39 > 收件人: "user-zh" 主题: Re: 消费kafka source反压 > > > 这种反压一般是下游反压过来的,可以检查下最后一个反压的算子,那个才是处理能力的瓶颈。 > > smq <374060...@qq.com 于2020年9月21日周一 下午2:08写道: > > > > 大家好,在测试flink消费速率时,发现数据处理比较慢,大概一个task每秒处理1000条左右,经过查看UI界面,发现读取kafka数据源这块source反压达到1,请问有这方面经验吗? > > > > -- > > Best, > Benchao Li -- Best, Benchao Li
Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题
可以通过SQL的where条件来过滤吧 chuyuan 于2020年9月21日周一 下午6:48写道: > 好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, Benchao Li
Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题
libMap = (Map) > JSON.parse(pointDO.getLib()); > } > this.setLib(libMap); > } > } > > 第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下: > "CREATE TABLE test.test(" + > " type STRING," + > " lib MAP," > + > " properties > MAP" + > ") PARTITIONED BY (" + > " dt string" + > " ) stored as orcfile " + > " TBLPROPERTIES" + > " (" + > > "'partition.time-extractor.kind'='custom'," + > > "'partition.time-extractor.timestamp-pattern'='$dt'," + > > > "'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor'," > + > > "'sink.partition-commit.trigger'='partition-time'," + > > "'sink.partition-commit.delay'='0s'," + > > "'sink.partition-commit.policy.kind'='metastore'" + > ")"); > > > 第三步,把临时表的数据insert into到目标表,此时出现异常: > org.apache.flink.table.api.TableException: A raw type backed by type > information has no serializable string representation. It needs to be > resolved into a proper raw type. > > 然后打印临时表的数据结构,发现lib和properties在临时表中数据结构被解析为: > |-- lib: LEGACY('RAW', 'ANY') > |-- properties: LEGACY('RAW', 'ANY') > |-- track_id: BIGINT > |-- type: STRING > ,这说明lib LEGACY('RAW', 'ANY')无法匹配hive目标表中lib > MAP数据结构,写入失败,大概流程是这样。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
Re: 消费kafka source反压
这种反压一般是下游反压过来的,可以检查下最后一个反压的算子,那个才是处理能力的瓶颈。 smq <374060...@qq.com> 于2020年9月21日周一 下午2:08写道: > > 大家好,在测试flink消费速率时,发现数据处理比较慢,大概一个task每秒处理1000条左右,经过查看UI界面,发现读取kafka数据源这块source反压达到1,请问有这方面经验吗? -- Best, Benchao Li
Re: Re: [SQL] parse table name from sql statement
我感觉可以先把SQL转成RelNode,然后用Calcite的visitor模式的RelShuttle来获取? Harold.Miao 于2020年9月21日周一 下午1:58写道: > 主要是我没有完整的所有单元case, 总是感觉写的不完整。 > > 郭士榕 于2020年9月21日周一 上午11:08写道: > > > > > > > > > 就是要一个一个判断做解析下推的,比如你举的SqlJoin例子, 然后继续left,right下推。 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2020-09-21 10:50:31,"Harold.Miao" 写道: > > >主要是嵌套回溯特别复杂, 例如getFrom之后后面可能又是嵌套一个SqlJoin等等类似情况太多。 还有要做很多的类型转换。 > > > > > >郭士榕 于2020年9月21日周一 上午10:21写道: > > > > > >> 可以使用calcite。解析kind为CREATE_TABLE的语句,解析INSERT,下推from的表。 > > >> > > >> > > >> > > >> > > >> > > >> 在 2020-09-21 10:12:13,"Harold.Miao" 写道: > > >> >hi all > > >> > > > >> >请教大家在复杂sql语句中parse所有的table name是怎么实现的。 > > >> > > > >> >谢谢 > > >> > > > >> >-- > > >> > > > >> >Best Regards, > > >> >Harold Miao > > >> > > > > > > > > >-- > > > > > >Best Regards, > > >Harold Miao > > > > > -- > > Best Regards, > Harold Miao > -- Best, Benchao Li
Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题
Hi chuyuan, 可以详细描述下你遇到的问题么,比如下面这些信息 - 用的是哪个Flink版本 - SQL(包括DDL和query) - 数据是什么样子的 chuyuan 于2020年9月21日周一 下午2:40写道: > LEGACY('RAW', > 'ANY')对应sql中数据类型改为:MAP,仍然报错,异常: > org.apache.flink.table.api.TableException: A raw type backed by type > information has no serializable string representation. It needs to be > resolved into a proper raw type. > 方便说下具体实现细节吗? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以
提交两个作业的话,两个作业是完全独立的,都会消费全量数据。 一个作业的消费能力不行,可以具体看下瓶颈在哪里,比如: 1. 作业是否有lag,如果没有lag,那其实是没有问题的 2. 如果作业有lag,而且lag还在上涨,说明当前消费能力不足,此时可以看下作业具体的瓶颈在哪里 有可能是某个算子在反压导致整个作业的消费能力不足 也有可能是作业的整体CPU资源不足导致的 也有一种极端情况是,作业的并发度已经足够大,source subtask已经对应一个kafka partition了,但是消费能力还是不足,这个时候其实是单个partition数据量太大,对应到Flink的source算子处理能力不足导致的 3. 如果作业当前有lag,但是lag在下降,说明消费能力其实是够的,只是数据有些积压 范超 于2020年9月18日周五 下午4:07写道: > 各位好,我遇到了一个奇怪的问题 > > 我是使用flink1.10和 flink-connector-kafka_2.11 > > 使用Flink on yarn 模式运行,无论怎么调大并行度。Kafka节点(我使用的单节点)的网卡输出速度一直上不去。 > > 但是提交两个同样的应用同样使用FLink on Yarm模式,Kafka节点的网卡输出速度是正常翻倍的。 > > 我想达到的目的不是通过多向yarn集群提交多一个app,而是通过设置并行度来提高应用的吞吐量。。 > > 求各位大佬指导 > -- Best, Benchao Li
Re: 关于flinksql 滑动窗口数据进不来的问题
那看起来就是watermark的问题了。你可以在Flink web UI上查看一下对应的算子的watermark是否符合预期。 有一个小tip,watermark本身是由数据来驱动更新的。比如你只有一条数据,那么你的watermark就只能是根据 这条数据计算出来的,不会自动再更新。 李杨烨 <438106...@qq.com> 于2020年9月14日周一 下午5:27写道: > 在进入stream之前是有数据的,使用hop方法计算之后就没有数据流出了。 > > > 水印的设置代码如下: > simpleResults.assignTimestampsAndWatermarks(WatermarkStrategy > . .withTimestampAssigner((event, > timestamp)-event.getGmtPaidLong()) > .withIdleness(Duration.ofSeconds(5))); > --- > 另外 刚刚我用了processTime做窗口滑动是可以实现的,但是processTime对业务不友好,因此如果根据rowTime可以做是最好的。 > > > --原始邮件-- > 发件人: > "user-zh" > < > libenc...@apache.org; > 发送时间:2020年9月14日(星期一) 下午5:19 > 收件人:"user-zh" > 主题:Re: 关于flinksql 滑动窗口数据进不来的问题 > > > > 可以再详细一点描述下问题么,滑动窗口数据进不来,指的是窗口没有触发计算还是数据就没有到窗口呢? > > 如果只是窗口没有触发计算,一般用了row time的话,可以排查下watermark是否有正常生成。 > > 李杨烨 <438106...@qq.com 于2020年9月14日周一 下午1:32写道: > > 刚刚邮件图片挂了,上传了新的图片地址: > http://chuantu.xyz/t6/741/1600061331x-1224481926.jpg > 使用rowTime做的滑动 > > > > -- > > Best, > Benchao Li -- Best, Benchao Li
Re: 关于flinksql 滑动窗口数据进不来的问题
可以再详细一点描述下问题么,滑动窗口数据进不来,指的是窗口没有触发计算还是数据就没有到窗口呢? 如果只是窗口没有触发计算,一般用了row time的话,可以排查下watermark是否有正常生成。 李杨烨 <438106...@qq.com> 于2020年9月14日周一 下午1:32写道: > 刚刚邮件图片挂了,上传了新的图片地址:http://chuantu.xyz/t6/741/1600061331x-1224481926.jpg > 使用rowTime做的滑动 -- Best, Benchao Li
Re: 关于使用flinksql 生成滑动窗口 table数据进不来的问题
你好,你的图片挂了,可以把图片放到第三方图床工具然后把链接发出来。或者直接用文本描述的问题。 李杨烨 <438106...@qq.com> 于2020年9月14日周一 上午11:25写道: > > 根据rowTime做的滑动 -- Best, Benchao Li
Re: UDAF函数在over窗口使用问题
Hi, 看起来你并没有实现`retract` 方法,正常来讲,over window在处理过期数据的时候,会将过期的数据进行一次retract计算。 所以你需要正确的实现一下retract方法。 chen310 <1...@163.com> 于2020年9月14日周一 上午10:01写道: > flink版本 1.11.1 > > 实现了一个UDAF聚集函数,将窗口内某些字段合并成一个字符串。代码如下: > > public class AggDistinctDetail extends AggregateFunction AggDistinctDetail.Details> { > private static final Logger logger = > LoggerFactory.getLogger(AggDistinctDetail.class); > > public static class Details { > public Set set; > } > > @Override > public Details createAccumulator() { > return new Details(); > } > > @Override > public String getValue(Details acc) { > return JSON.toJSONString(acc.set); > } > > public void accumulate(Details acc, String val) { > if (acc.set == null) { > acc.set = new HashSet<>(); > } > acc.set.add(val); > } > > public void retract(Details acc, String val) { > //now, agg detail don't need support retraction > } > > public void merge(Details acc, Iterable it) { > Iterator iter = it.iterator(); > if (acc.set == null) { > acc.set = new HashSet<>(); > } > while (iter.hasNext()) { > Details a = iter.next(); > acc.set.addAll(a.set); > } > } > > public void resetAccumulator(Details acc) { > acc.set = null; > } > } > > 将此UDAF使用在over窗口上,此窗口按realIp分区,以消息中事件时间(EventTime) > requestDateTime向前推24小时作为窗口,统计窗口内realIp对应的所有userId,作为明细输出userId聚集后的字符串。 > > drop function if exists UDF_InfoDistinctMerge; > create function UDF_InfoDistinctMerge AS > 'com.binance.risk.flink.udf.AggDistinctDetail'; > > select > realIp , > UDF_InfoDistinctMerge(userId) over w1 as userSet > from source_table > window w1 as (partition by realIp order by requestDateTime asc RANGE > BETWEEN > INTERVAL '24' hour preceding AND CURRENT ROW) ; > > 实际测试下来,发现聚集后的字符串userSet是一直在增长,即使窗口时间已经超过24小时,依然被聚集到userSet这个结果中,这和预期不符。 > > 问题: > 是上面UDAF的实现有啥问题么?还是UDAF在over窗口上有bug? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
Re: flink实时统计GMV,如果订单金额下午变了该怎么处理
sql 算子内部会自动处理这些状态。 这个状态只是聚合的中间结果,并不需要保留原始数据。 当然这个聚合的中间结果状态,也可以指定state retention time来清理一些过期的状态。 last_value只是一个聚合函数,没啥特殊的地方,而且只是按照处理时间获取最后一条数据的聚合函数。 lec ssmi 于2020年9月10日周四 下午2:35写道: > 上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗? > 感觉底层和 last_value() group by id是一样的。 > > Benchao Li 于2020年9月10日周四 上午10:34写道: > > > > > > 1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。 > > 如果还有自己的binlog格式,也可以自定义format来实现。 > > > > 只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可以认为 > > 1. append / update_after 消息会累加到聚合指标上 > > 2. delete / update_before 消息会从聚合指标上进行retract > > > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/canal.html > > [2] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html > > > > 忝忝向仧 <153488...@qq.com> 于2020年9月9日周三 下午10:54写道: > > > > > 请问第1点是有实际的案例使用了么? > > > 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录? > > > 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的 > > > 谢谢. > > > > > > > > > > > > > > > --原始邮件-- > > > 发件人: > > > "user-zh" > > > < > > > libenc...@apache.org; > > > 发送时间:2020年9月9日(星期三) 中午1:09 > > > 收件人:"user-zh" > > > > > 主题:Re: flink实时统计GMV,如果订单金额下午变了该怎么处理 > > > > > > > > > > > > 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩: > > > 1. 首先版本是1.11+, 可以直接用binlog > > > format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink > > > 内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by > > > yyy这种,那这个sum指标会自动做好这件事。 > > > 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] > > 将append数据流转成retract数据流,这样下游再用同样的 > > > 聚合逻辑,效果也是一样的。 > > > > > > [1] > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication > > > > > > > > > xuzh > > > > > 场景: > > > nbsp; nbsp;实时统计每天的GMV,但是订单金额是会修改的。 > > > nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 > > > nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka > ,GMV实时统计为1000. > > > nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. > > > 这时如果不减去上午已经统计的金额。那么总金额就是错的。nbsp;nbsp; > > > 请问是不是根据 update /delete > > 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 > > > > > > > > > nbsp; 刚入坑实时处理,请大神赐教 > > > > > > > > > > > > -- > > > > > > Best, > > > Benchao Li > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li
Re: flink实时统计GMV,如果订单金额下午变了该怎么处理
1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。 如果还有自己的binlog格式,也可以自定义format来实现。 只要source端产生了changelog数据,后面的算子是可以自动处理update消息的,简单理解,你可以认为 1. append / update_after 消息会累加到聚合指标上 2. delete / update_before 消息会从聚合指标上进行retract [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/canal.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html 忝忝向仧 <153488...@qq.com> 于2020年9月9日周三 下午10:54写道: > 请问第1点是有实际的案例使用了么? > 意思是1.11+可以在sql层面,决定聚合计算是update_before那条记录还是update_after那条记录? > 这个决定采用哪条是在哪里标识的?Flink可以知道是取after的还是before的 > 谢谢. > > > > > --原始邮件-- > 发件人: > "user-zh" > < > libenc...@apache.org; > 发送时间:2020年9月9日(星期三) 中午1:09 > 收件人:"user-zh" > 主题:Re: flink实时统计GMV,如果订单金额下午变了该怎么处理 > > > > 不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩: > 1. 首先版本是1.11+, 可以直接用binlog > format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink > 内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by > yyy这种,那这个sum指标会自动做好这件事。 > 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] 将append数据流转成retract数据流,这样下游再用同样的 > 聚合逻辑,效果也是一样的。 > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication > > > xuzh > 场景: > nbsp; nbsp;实时统计每天的GMV,但是订单金额是会修改的。 > nbsp; 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 > nbsp; 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000. > nbsp; 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. > 这时如果不减去上午已经统计的金额。那么总金额就是错的。nbsp;nbsp; > 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 > > > nbsp; 刚入坑实时处理,请大神赐教 > > > > -- > > Best, > Benchao Li -- Best, Benchao Li
Re: flink实时统计GMV,如果订单金额下午变了该怎么处理
不知道你是用的SQL还是DataStream API,如果用的是SQL的话,我感觉可以这么玩: 1. 首先版本是1.11+, 可以直接用binlog format,这样数据的修改其实会自动对应到update_before和update_after的数据,这样Flink 内部的算子都可以处理好这种数据,包括聚合算子。比如你是select sum(xxx) from T group by yyy这种,那这个sum指标会自动做好这件事。 2. 如果不用binlog模式,只是取最新的数据来做聚合计算,也可以用去重算子[1] 将append数据流转成retract数据流,这样下游再用同样的 聚合逻辑,效果也是一样的。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication xuzh 于2020年9月8日周二 下午5:56写道: > 场景: > 实时统计每天的GMV,但是订单金额是会修改的。 > 订单存储在mysql,通过binlog解析工具实时同步到kafka.然后从kafka实时统计当日订单总额。 > 假设订单009 上午10点生成,金额为1000. 生成一条json数据到kafka ,GMV实时统计为1000. > 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. > 这时如果不减去上午已经统计的金额。那么总金额就是错的。 > 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 > > > 刚入坑实时处理,请大神赐教 -- Best, Benchao Li
Re: how flink-sql connector kafka reads array json
Hi, 这个是一个已知的问题,已经有issue[1] 在跟进解决了。预计在1.12可以使用。 [1] https://issues.apache.org/jira/browse/FLINK-18590 大罗 于2020年9月8日周二 上午10:39写道: > hi,大家好,我遇到一个问题。 > > 下游系统发过来的数据是json数组,比如[{"name": "daluo", "age": 1}, {"name": "xiaoming", > "age": 2}],我想使用'connector.type' = 'kafka' 阅读此类数据,应该如何写如下的sql? > > CREATE TABLE mykafka1 (name String, age Int) > WITH ( >'connector.type' = 'kafka', >'format.type' = 'json', >'update-mode' = 'append' > ); > > > 还是说,先使用原生的FlinkKafkaConsumer读取变成DataStream>,再转换flatMap转换成DataStream,再使用tableEnv.fromDataStream把它变成tableSource? > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
Re: FlinkSQL如何处理上游的表结构变更
我理解SQL本身都是强类型的,要处理这种schema会变更的情况可能本身就不是很合适。 CC 云邪,不知道云邪大佬对这个有没有更好的想法。 忝忝向仧 <153488...@qq.com> 于2020年9月5日周六 下午4:50写道: > 同问如果是上游表结构变更没有及时通知到下游,数据同步这块就会报错 > 有没什么办法解决? > > > > > --原始邮件-- > 发件人: > "user-zh" > < > flin...@163.com; > 发送时间:2020年9月4日(星期五) 下午2:18 > 收件人:"user-zh" > 主题:FlinkSQL如何处理上游的表结构变更 > > > > Hi all: > flink version : 1.11.0 > 场景:上游的数据来自binlog,当发生表结构变更时,希望能够实时的变动flink内部表的schema,但是目前来看,表的schema都是create > table时写死的,有什么办法可以处理这种场景呢 -- Best, Benchao Li
Re: 回复:请指教一个关于时间窗的问题,非常感谢!
+ > "from (select appid,eventid,count(*) as cnt," + > "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + > "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + > "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' > HOUR),TIME '00:00:00')"; > > > Table table = tenv.sqlQuery(sql); > DataStream dataStream = tenv.toAppendStream(table, Result.class); > dataStream.print(); > > env.execute("etl.exception.monitor.ExceptionAlertHour"); > } > > > > public static class Result{ > private String appid; > private String eventid; > private long cnt; > private Timestamp stime; > private Timestamp etime; > public String getAppid() { > return appid; > } > > public void setAppid(String appid) { > this.appid = appid; > } > > public String getEventid() { > return eventid; > } > > public void setEventid(String eventid) { > this.eventid = eventid; > } > > public long getCnt() { > return cnt; > } > > public void setCnt(long cnt) { > this.cnt = cnt; > } > > > public Timestamp getStime(){ > return stime; > } > > public void setStime(Timestamp stime){ > this.stime = stime; > } > > public Timestamp getEtime(){ > return etime; > } > > public void setEtime(Timestamp etime){ > this.etime = etime; > } > > @Override > public String toString(){ > return "ResultHour{" + > "appid=" + appid + > ",eventid=" + eventid + > ",cnt=" + cnt + > ", stime=" + stime + > ", etime=" + etime + > ", SystemTime=" + System.currentTimeMillis() + > '}'; > } > } > > } > > > 发件人: jacky-cui > 发送时间: 2020-09-02 18:58 > 收件人: user-zh > 主题: 回复:请指教一个关于时间窗的问题,非常感谢! > 你这个flink是什么版本,能贴全代码吗 > > > --原始邮件-- > 发件人: > "user-zh" > < > samuel@ubtrobot.com; > 发送时间:2020年9月2日(星期三) 下午3:20 > 收件人:"user-zh" > 主题:请指教一个关于时间窗的问题,非常感谢! > > > > 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看! > > //指定eventtime字段及生成watermark > DataStream withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( > WatermarkStrategy > > . //. .withIdleness(Duration.ofSeconds(10)) //即时没数据时,也生成watermark > .withTimestampAssigner((event, timestamp)-event.f3)); > > StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > tenv.registerDataStream( > "log", > withTimestampsAndWatermarksDS, > "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); > > String sql = "select appid,eventid,cnt," + > > "(starttime + interval '8' hour ) as stime," + > > "(endtime + interval '8' hour ) as etime " > + > "from > (select appid,eventid,count(*) as cnt," + > > "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + > > "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + > "from > log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME > '00:00:00')"; //希望整点结束时触发时间窗关闭 > > Table table = tenv.sqlQuery(sql); > DataStream Result.class); > > 输出的结果是: > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。 > (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 > 等到这条数据上来后才触发 > ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 > 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 > //2020/9/2 15:23:35} > 请问一下哪里出了问题?万分感谢! > -- Best, Benchao Li
Re: flink json ddl解析
Hi, 如果声明为 ARRAY 是否可以满足你的需求呢?如果可以的话,你可以在 1.12之后使用这个feature[1]. [1] https://issues.apache.org/jira/browse/FLINK-18002 zilong xiao 于2020年9月1日周二 下午5:49写道: > 问题大概懂了,坐等Flink大佬回复 > > Dream-底限 于2020年9月1日周二 下午4:43写道: > > > hi > > 就是json数组如果是这种:[1,2,3],我可以直接array解析 > > > > > 如果json数组是这种:[1,"test",true],如果我用array>程序是没办法运行的,如果我用array > int,b string,c boolean>>,flink做ddl翻译解析json的时候会把row > boolean>这一部分映射为解析jsonobject,但是array元素不是jsonobject会导致取不到数据 > > > > zilong xiao 于2020年9月1日周二 下午4:04写道: > > > > > 基本类型包装一层会导致解析不出来 这个没太明白,可以举个列子吗? > > > > > > Dream-底限 于2020年9月1日周二 下午2:20写道: > > > > > > > hi、 > > > > > > > > > > 我先前也想这样用,但后来发现ddl中的row对应json中的object,基本类型包装一层会导致解析不出来,感觉应该在ddl加一个类型映射一下这种情况 > > > > > > > > zilong xiao 于2020年9月1日周二 上午11:47写道: > > > > > > > > > like this: ARRAY > > > String>>> > > > > > > > > > > Dream-底限 于2020年9月1日周二 上午11:40写道: > > > > > > > > > > > hi > > > > > > > > > > > > > > > > > > > > > > > > > > > 我正在解析json数组,在解析的时候遇到一个问题,当解析的json数组元素为同一类型的时候,我可以使用ddl的array进行存储,但是当json数组元素为不同类型的时候,我没办法做ddl映射,我查看JsonRowSchemaConverter解析json > > > > > > > array的时候,对于不同类型的数组元素解析后可以用row存储,但请问我在ddl时候要怎么做,因为在DDL用row表示数组会抛出异常 > > > > > > > > > > > > > > > > > > private static TypeInformation convertArray(String location, > > > > > > JsonNode node, JsonNode root) { > > > > > >// validate items > > > > > >if (!node.has(ITEMS)) { > > > > > > throw new IllegalArgumentException( > > > > > > "Arrays must specify an '" + ITEMS + "' property in > node: > > " > > > + > > > > > > location); > > > > > >} > > > > > >final JsonNode items = node.get(ITEMS); > > > > > > > > > > > >// list (translated to object array) > > > > > >if (items.isObject()) { > > > > > > final TypeInformation elementType = convertType( > > > > > > location + '/' + ITEMS, > > > > > > items, > > > > > > root); > > > > > > // result type might either be ObjectArrayTypeInfo or > > > > > > BasicArrayTypeInfo for Strings > > > > > > return Types.OBJECT_ARRAY(elementType); > > > > > >} > > > > > >// tuple (translated to row) > > > > > >else if (items.isArray()) { > > > > > > final TypeInformation[] types = convertTypes(location + > > '/' > > > + > > > > > > ITEMS, items, root); > > > > > > > > > > > > // validate that array does not contain additional items > > > > > > if (node.has(ADDITIONAL_ITEMS) && > > > > > > node.get(ADDITIONAL_ITEMS).isBoolean() && > > > > > > node.get(ADDITIONAL_ITEMS).asBoolean()) { > > > > > > throw new IllegalArgumentException( > > > > > > "An array tuple must not allow additional items in > > node: > > > " > > > > > > + location); > > > > > > } > > > > > > > > > > > > return Types.ROW(types); > > > > > >} > > > > > >throw new IllegalArgumentException( > > > > > > "Invalid type for '" + ITEMS + "' property in node: " + > > > > location); > > > > > > } > > > > > > > > > > > > > > > > > > > > > -- Best, Benchao Li
Re: flink1.11时间函数
不确定的意思是,这个函数的返回值是动态的,每次调用返回可能不同。 对应的是确定性函数,比如concat就是确定性函数,只要输入是一样的,它的返回值就永远都是一样的。 这个函数是否是确定性的,会影响plan的过程,比如是否可以做express reduce,是否可以复用表达式结果等。 Dream-底限 于2020年8月28日周五 下午2:50写道: > hi > > UNIX_TIMESTAMP() > > NOW() > > 我这面想使用flink的时间戳函数,但是看官方文档对这两个函数描述后面加了一个此功能不确定,这个此功能不确定指的是这两个时间函数不能用吗 > -- Best, Benchao Li
Re: Flink 维表延迟join
这种场景是不是可以直接用批的方式来处理呢?那就不需要维表了,正常join即可, 这样可以用到批里面一些特有的join优化。 魏烽 于2020年8月28日周五 上午9:58写道: > 各位好: > > > 现在有一个应用场景是使用流的方式读取hdfs文件进行处理(StreamEnv.readTextFile),实际可以看成是批处理,现需要进行维表join,维表不会变更,现有两种方案: > > 1.直接将维表一次性加载到内存进行join; > > 2.使用mysql或者hbase外部存储每条数据进行查询join; > > 但是方案一不能保证数据量一定可以全部加载到内存,方案二又需要额外的外部存储,增加了系统结构的复杂度 > > 请问各位有什么更好的建议嘛?感谢 > > 原始邮件 > 发件人: Leonard Xu > 收件人: Jark Wu > 抄送: user-zh; Benchao Li > 发送时间: 2020年8月27日(周四) 20:11 > 主题: Re: Flink 维表延迟join > > > 多谢 Jark 提议 > > Issue[1] 建好了, 大家可以在issue下讨论。 > > 祝好 > Leonard > [1] https://issues.apache.org/jira/browse/FLINK-19063 < > https://issues.apache.org/jira/browse/FLINK-19063> > > > > 在 2020年8月27日,19:54,Jark Wu mailto:imj...@gmail.com>> > 写道: > > > > @Leonard 可以先建个 issue,收集下大家的需求,大家也可以在 issue 下讨论下解决思路。 > > > > On Thu, 27 Aug 2020 at 11:12, Leonard Xu xbjt...@gmail.com> <mailto:xbjt...@gmail.com<mailto:xbjt...@gmail.com>>> > wrote: > > > > Hi, all > > > > 看起来维表延迟join是一个common case, 我在邮件列表里看到蛮多小伙伴反馈了, > > 感觉可以考虑支持下 维表 延迟 join,大家可以一起分享下主要的业务场景吗? > > > > Best > > Leonard > > > > > 在 2020年8月27日,10:39,china_tao taochangl...@163.com> <mailto:taochangl...@163.com taochangl...@163.com>>> 写道: > > > > > > 一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left > > > > join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。 > > > 个人推荐先用null存储,后期etl补录。 > > > > > > > > > > > > -- > > > Sent from: http://apache-flink.147419.n8.nabble.com/ < > http://apache-flink.147419.n8.nabble.com/> > > > > > > -- Best, Benchao Li
Re: [ANNOUNCE] New PMC member: Dian Fu
Congratulations Dian! Cranmer, Danny 于2020年8月27日周四 下午10:55写道: > Congratulations Dian! :D > > On 27/08/2020, 15:25, "Robert Metzger" wrote: > > CAUTION: This email originated from outside of the organization. Do > not click links or open attachments unless you can confirm the sender and > know the content is safe. > > > > Congratulations Dian! > > On Thu, Aug 27, 2020 at 3:09 PM Congxian Qiu > wrote: > > > Congratulations Dian > > Best, > > Congxian > > > > > > Xintong Song 于2020年8月27日周四 下午7:50写道: > > > > > Congratulations Dian~! > > > > > > Thank you~ > > > > > > Xintong Song > > > > > > > > > > > > On Thu, Aug 27, 2020 at 7:42 PM Jark Wu wrote: > > > > > > > Congratulations Dian! > > > > > > > > Best, > > > > Jark > > > > > > > > On Thu, 27 Aug 2020 at 19:37, Leonard Xu > wrote: > > > > > > > > > Congrats, Dian! Well deserved. > > > > > > > > > > Best > > > > > Leonard > > > > > > > > > > > 在 2020年8月27日,19:34,Kurt Young 写道: > > > > > > > > > > > > Congratulations Dian! > > > > > > > > > > > > Best, > > > > > > Kurt > > > > > > > > > > > > > > > > > > On Thu, Aug 27, 2020 at 7:28 PM Rui Li < > lirui.fu...@gmail.com> > > > wrote: > > > > > > > > > > > >> Congratulations Dian! > > > > > >> > > > > > >> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei < > yuanmei.w...@gmail.com> > > > > > wrote: > > > > > >> > > > > > >>> Congrats! > > > > > >>> > > > > > >>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang < > hxbks...@gmail.com > > > > > > > > wrote: > > > > > >>> > > > > > >>>> Congratulations Dian! > > > > > >>>> > > > > > >>>> Best, > > > > > >>>> Xingbo > > > > > >>>> > > > > > >>>> jincheng sun 于2020年8月27日周四 > 下午5:24写道: > > > > > >>>> > > > > > >>>>> Hi all, > > > > > >>>>> > > > > > >>>>> On behalf of the Flink PMC, I'm happy to announce that > Dian Fu > > is > > > > now > > > > > >>>>> part of the Apache Flink Project Management Committee > (PMC). > > > > > >>>>> > > > > > >>>>> Dian Fu has been very active on PyFlink component, > working on > > > > various > > > > > >>>>> important features, such as the Python UDF and Pandas > > > integration, > > > > > and > > > > > >>>>> keeps checking and voting for our releases, and also has > > > > successfully > > > > > >>>>> produced two releases(1.9.3&1.11.1) as RM, currently > working as > > > RM > > > > > to push > > > > > >>>>> forward the release of Flink 1.12. > > > > > >>>>> > > > > > >>>>> Please join me in congratulating Dian Fu for becoming a > Flink > > PMC > > > > > >>>>> Member! > > > > > >>>>> > > > > > >>>>> Best, > > > > > >>>>> Jincheng(on behalf of the Flink PMC) > > > > > >>>>> > > > > > >>>> > > > > > >> > > > > > >> -- > > > > > >> Best regards! > > > > > >> Rui Li > > > > > >> > > > > > > > > > > > > > > > > > > > > > -- Best, Benchao Li
Re: Flink 维表延迟join
Hi, 我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。 其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。 郑斌斌 于2020年8月27日周四 上午9:23写道: > 小伙伴们: > > 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。 > 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。 > > Thanks -- Best, Benchao Li
Re: 关于sink失败 不消费kafka消息的处理
Hi Eleanore,shizk233 同学给出的解释已经很全面了。 对于你后面提的这个问题,我感觉这个理解应该不太正确。 开了checkpoint之后,虽然kafka producer没有用两阶段提交,但是也可以保证在checkpoint成功的时候 会将当前的所有数据flush出去。如果flush失败,那应该是会导致checkpoint失败的。所以我理解这里应该是 at least once的语义,也就是数据可能会重复,但是不会丢。 Eleanore Jin 于2020年8月27日周四 上午9:53写道: > Hi shizk233, > > 非常感谢你的回答! 如果是如下场景:我的DAG 就是从kafka source topic 读取数据, 然后写到kafka sink topic, > 中间没有其他stateful operator. 如果sink operator 不是两端提交,就是kafka producer send, > 那么如果开启checkpoint, state 就只是source operator kafka offset. > > 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有, > 这个时候source operator 成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5, 假设是6. > 假如这个时候publish message 4 失败了, 那么job restart from last successful checkpoint, > source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗 > > 谢谢! > Eleanore > > On Wed, Aug 26, 2020 at 9:32 AM shizk233 > wrote: > > > Hi Eleanore,这个问题我可以提供一点理解作为参考 > > > > 1.chk与at least once > > checkpoint机制的思想就是做全局一致性的快照,失败恢复时数据的消费位点会回滚到上一次chk n的进度, > > 然后进行数据重放,这样就保证了数据不会缺失,至少被消费一次。 > > > > 2. sink2PC > > 在chk机制下,数据重放时一般sink端的数据不能回滚,就会有重复数据。如果是upsert sink那仍然是一致的, > > 否则需要通过2PC的预提交来将chk n+1成功前的数据写到临时存储,等chk n+1完成再真正写入的物理存储。如果 > > 在chk n+1之前任务失败回滚了,那临时存储的数据也可以回滚,这样就能保证一致性。 > > > > 这样的话 chk就是at least once,chk+upsert或者chk+2pc就是exactly once了。 > > > > 3.kafka auto commit > > chk快照的state不仅仅是source的offset,还有数据流中各个算子的state,chk机制会在整个数据流完成同一个chk > > n的时候才提交offset。 > > kafka auto commit不能保障这种全局的一致性,因为auto commit是自动的,不会等待整个数据流上同一chk n的完成。 > > > > Eleanore Jin 于2020年8月26日周三 下午11:51写道: > > > > > Hi Benchao > > > 可以解释一下为什么sink没有两阶段提交,那就是at least once 的语义吗? 比如source和 sink 都是kafka, 如果 > > sink > > > 不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto > commit > > > offset 看起来似乎没有什么区别 > > > > > > 可否具体解释一下? 谢谢! > > > > > > Eleanore > > > > > > On Tue, Aug 25, 2020 at 9:59 PM Benchao Li > wrote: > > > > > > > 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。 > > > > > > > > 范超 于2020年8月26日周三 上午11:38写道: > > > > > > > > > 大家好,我现在有个疑问 > > > > > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库; > > > > > > > > > > > > > > > > > > > > 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢? > > > > > > > > > > > > > > > 多谢大家了 > > > > > > > > > > 范超 > > > > > > > > > > > > > > > > > -- > > > > > > > > Best, > > > > Benchao Li > > > > > > > > > > -- Best, Benchao Li
Re: 关于sink失败 不消费kafka消息的处理
这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。 范超 于2020年8月26日周三 上午11:38写道: > 大家好,我现在有个疑问 > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库; > > 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢? > > > 多谢大家了 > > 范超 > -- Best, Benchao Li
Re: flink1.11 sql问题
Hi, 这个功能已经在1.12支持了[1],如果着急使用,可以cherry-pick回去试试看。 用法就是直接把这个字段声明为varchar,json format会帮你自动处理 [1] https://issues.apache.org/jira/browse/FLINK-18002 酷酷的浑蛋 于2020年8月25日周二 下午6:32写道: > > > 还没到udf那一步,直接用create table的方式,过来的数据就是获取不到值的, > CREATE TABLE test ( > a VARCHAR, > b INT > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test', > 'properties.bootstrap.servers' = 'xxx', > 'properties.group.id' = 'groupid', > 'scan.startup.mode' = 'group-offsets', > 'format'='json' > ); > > > > > 在2020年08月25日 16:14,Jim Chen 写道: > 这个需要你自定义UDF > > 酷酷的浑蛋 于2020年8月25日周二 下午3:46写道: > > 关键是那个值不是固定的,有时候是json,有时候是json数组,没办法固定写一个,现在我只想把value当做字符串获取到,难道没有办法吗 > > > > > 在2020年08月25日 15:34,taochanglian 写道: > flinksql,处理json ,对象的话用row,数组的话用array获取具体的值。 > > 在 2020/8/25 14:59, 酷酷的浑蛋 写道: > 还是这个问题,如果字段的值有时候是json有时候是json数组,那么我只想把它当做字符串显示,该怎么写? > > > > > 在2020年08月25日 14:05,酷酷的浑蛋 写道: > 我知道了 > > > > > 在2020年08月25日 13:58,酷酷的浑蛋 写道: > > > > > flink1.11 > > 读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理? > > -- Best, Benchao Li
Re: Elasticsearch 写入问题
你指的是用json format么?这个是json format的问题,当前的确是会把所有字段都写进去,不管是否为null。 我们内部也有类似需求,我们是修改了json format,可以允许忽略null的列。 于2020年8月23日周日 下午6:11写道: > 在flink1.11中使用table sql写入时,有一些字段为空时,依然被写入到elasticsearch,这个方式应该不太恰当。 > > > -- Best, Benchao Li
Re: 1.11版本,关于TableEnvironment.executeSql("insert into ..."),job名称设置的问题
FYI: 有一个issue[1] 正在跟进和解决这个问题 [1] https://issues.apache.org/jira/browse/FLINK-18545 Zou Dan 于2020年8月23日周日 下午2:29写道: > 据我所知,这种执行方式目前没法设置 jobName > > > 2020年8月21日 上午11:11,Asahi Lee <978466...@qq.com> 写道: > > > > 你好! > > 我通过表环境执行insert into语句提交作业,我该如何设置我的job名称呢? > > > > > > 程序: > > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().build(); > > TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings); > > > > String sourceDDL = "CREATE TABLE datagen ( " + > >" f_random INT, " + > >" f_random_str STRING, " + > >" ts AS localtimestamp, " + > >" WATERMARK FOR ts AS ts " + > >") WITH ( " + > >" 'connector' = 'datagen', " + > >" 'rows-per-second'='10', " + > >" 'fields.f_random.min'='1', " + > >" 'fields.f_random.max'='5', " + > >" 'fields.f_random_str.length'='10' " + > >")"; > > > > bsTableEnv.executeSql(sourceDDL); > > Table datagen = bsTableEnv.from("datagen"); > > > > System.out.println(datagen.getSchema()); > > > > String sinkDDL = "CREATE TABLE print_table (" + > >" f_random int," + > >" c_val bigint, " + > >" wStart TIMESTAMP(3) " + > >") WITH ('connector' = 'print') "; > > bsTableEnv.executeSql(sinkDDL); > > > > System.out.println(bsTableEnv.from("print_table").getSchema()); > > > > Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str), > TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by > TUMBLE(ts, INTERVAL '5' second), f_random"); > > bsTableEnv.executeSql("insert into print_table select * from " + table); > > > -- Best, Benchao Li
Re: Re: 如何设置FlinkSQL并行度
Hi forideal, 我在本地试了一下,没有复现你说的这个情况。 我看代码也没有这个逻辑,如果是没有分配到partition,应该是会阻塞住,而不是finish。 你这个测试用的是社区的版本么?还是有什么特殊的改动? forideal 于2020年8月21日周五 下午11:43写道: > Hi 赵一旦, > 基础信息:使用 watermark for 语法设置watermark,Flink SQL,Blink planner,Flink 1.10.0 > 我最近做了一个实验,将Flink SQL 的并发设置为 kafka topic partition 的 2 倍,同时设置 idle 的时间为 10s。 > 这时,1.source 会有一半的partition 立马就 finished > 2.下游的 workmark 变成了LONG的最大值 > 整个任务都无法正常运行了。 > > > Best forideal > > > > > 在 2020-08-17 10:05:48,"Zhao,Yi(SEC)" 写道: > >我这边才研究FlinkSQL没几天。不过按照目前了解,是不支持算子级别并行度设置的。 > > >此外你说的checkpoint无法正常触发,我估计是因为barrier的问题,部分并行示例没有分区数据,导致没数据就可能导致。这个问题类似,可能无解。 > > > >非要解决可以写代码,把souce部分不使用sql实现。 > >__ > > > >在 2020/8/15 下午8:21,“forideal” 写入: > > > >Hi 赵一旦, > > > > > >目前 Flink SQL 我这边使用也是无法指定各个算子的并行度。目前我这边遇到两个问题。 > >1.并行度超过 topic partition 的时候会造成资源浪费 > >2.并行度超过 topic partition 后,checkpoint 也无法正常触发了 > > > > > >Best forideal > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >在 2020-08-14 12:03:32,"赵一旦" 写道: > >>检查点呢,大多数用FlinkSQL的同学们,你们的任务是随时可运行那种吗,不是必须保证不可间断的准确性级别吗? > >> > >>Xingbo Huang 于2020年8月14日周五 下午12:01写道: > >> > >>> Hi, > >>> > >>> 关于并行度的问题,据我所知,目前Table API上还没法对每一个算子单独设置并行度 > >>> > >>> Best, > >>> Xingbo > >>> > >>> Zhao,Yi(SEC) 于2020年8月14日周五 上午10:49写道: > >>> > >>> > > 并行度问题有人帮忙解答下吗,此外补充个相关问题,除了并行度,flink-sql情况下,能做检查点/保存点,并基于检查点/保存点重启sql任务吗。 > >>> > > >>> > 发件人: "Zhao,Yi(SEC)" > >>> > 日期: 2020年8月13日 星期四 上午11:44 > >>> > 收件人: "user-zh@flink.apache.org" > >>> > 主题: 如何设置FlinkSQL并行度 > >>> > > >>> > 看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。 > >>> > 如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢? > >>> > > >>> > 比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。 > >>> > > >>> > > >>> > > > > > -- Best, Benchao Li
Re: JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗
每秒1多条不算少了吧,如果还想再高一些,可以提高一下sink.buffer-flush.max-rows配置,默认是100 LittleFall <1578166...@qq.com> 于2020年8月20日周四 下午7:56写道: > 这是我的代码,它仅仅把数据从 datagen source 写入到了 jdbc sink. > > ```java > package main; > > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > > public class Main { > > public static void main(String[] args) { > StreamTableEnvironment tEnv = StreamTableEnvironment.create( > StreamExecutionEnvironment.getExecutionEnvironment(), > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > ); > > tEnv.executeSql("CREATE TABLE gen_stuff (\n" + > "\tstuff_id int,\n" + > "\tstuff_base_id int,\n" + > "\tstuff_name varchar(20)\n" + > ") WITH (\n" + > " 'connector' = 'datagen'," + > "'rows-per-second'='1000'," + > "'fields.stuff_id.kind'='sequence'," + > "'fields.stuff_id.start'='1'," + > "'fields.stuff_id.end'='1000'," + > "'fields.stuff_name.length'='15'" + > ")" > ); > tEnv.executeSql("CREATE TABLE result_stuff (\n" + > "\tstuff_id int,\n" + > "\tstuff_base_id int,\n" + > "\tstuff_name varchar(20)\n" + > ") WITH (\n" + > "\t'connector' = 'jdbc',\n" + > "\t'url'= > 'jdbc:mysql://127.0.0.1:3306/test?rewritebatchedstatements=true',\n" + > "\t'table-name' = 'result_stuff',\n" + > "\t'username' = 'root',\n" + > "\t'password' = ''\n" + > ")" > ); > > tEnv.executeSql("insert into result_stuff select stuff_id, > stuff_base_id, stuff_name from gen_stuff"); > } > } > ``` > > 然而,mysql 每秒大约只多 1 条数据。如果按一条数据 20B 来算,写入速度是 200KB/s,这无法满足我的需求。。。 > > 请问,是我哪里的配置有问题,还是有其它更好的写入数据库的方案,谢谢给出任何建议的人。 > > 我使用的和 jdbc 有关的依赖如下: > > ```xml > > org.apache.flink > > flink-connector-jdbc_${scala.binary.version} > ${flink.version} > > > mysql > mysql-connector-java > 8.0.21 > > ``` > > (作为对比,在我的电脑上使用 datagen 生成数据,写入文件系统 sinker 的效率大约是 23MB/s) > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能
不同的key应该用的是同一个state的实例,但是state下面会处理不同的key对应的state,也就是key对于用户来说是透明的。 比如你用一个MapState,那就是约等于每个key都有一个Map实例,不同key之间是独立的。 shizk233 于2020年8月20日周四 下午5:03写道: > 谢谢大佬解答。 > 想再问一下,在并发1的情况下,一个算子里的MapState应该只有一个实例, > 那么数据流上的不同key(不是map里的hash key)是怎么保证只获取到对应的那部分map state数据呢? > > 按我的理解,按key分流后,每个子流应该是只有自己的state,但从算子实例考虑,好像只有1个state实例存在。 > > Benchao Li 于2020年8月20日周四 下午4:40写道: > > > Hi, > > > > 问题1&2 都不存在多线程的问题。Flink底层来保证这些方法都是在同一个线程串行执行的。 > > > > shizk233 于2020年8月20日周四 下午2:22写道: > > > > > Hi all, > > > > > > 请教一下,KeyedCoProcessFunction比较特殊,有两个输入,对应两个ProcessElement方法。 > > > 问题1: > > > 如果在这两个Process方法中都对同一个MapState进行修改,是否会存在资源竞争的关系? > > > 还是这两个方法是顺序执行的? > > > > > > 问题2: > > > 虽然有不同的key,但函数只有一个实例,其中的MapState应该也是一个实例,那么不同key下的 > > > Process过程是并发执行的还是顺序执行的,会竞争MapState资源吗? > > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li
Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能
Hi, 问题1&2 都不存在多线程的问题。Flink底层来保证这些方法都是在同一个线程串行执行的。 shizk233 于2020年8月20日周四 下午2:22写道: > Hi all, > > 请教一下,KeyedCoProcessFunction比较特殊,有两个输入,对应两个ProcessElement方法。 > 问题1: > 如果在这两个Process方法中都对同一个MapState进行修改,是否会存在资源竞争的关系? > 还是这两个方法是顺序执行的? > > 问题2: > 虽然有不同的key,但函数只有一个实例,其中的MapState应该也是一个实例,那么不同key下的 > Process过程是并发执行的还是顺序执行的,会竞争MapState资源吗? > -- Best, Benchao Li
Re: flink 1.11 web ui请教
Hi, 因为目前的维表Join实现本身是没有用shuffle的,也就是维表算子跟上面的算子的连接方式为forward。 其他的join,都是直接按照join的key进行hash的,所以跟前面的算子的链接方式为hash。 于2020年8月19日周三 下午3:33写道: > 版本:1.11 > 部署:standalone > > 数据从kafka写到kafka > 1.提交的任务包含两个维表join和两条insert > 语句,但是在中间的执行图,只有一个方框。其他有些join任务会有不同的框,用hash连线表示。这是什么原因? > > 2.底下的 records received等几个列都是0。怎么样才会统计? > > > -- Best, Benchao Li
Re: flink 1.11 order by rowtime报错
Hi 斌斌, 感觉你应该是遇到了一个已知的bug[1] [1] https://issues.apache.org/jira/browse/FLINK-16827 郑斌斌 于2020年8月19日周三 下午1:20写道: > > 报下面的这个错误,并行度设置为1就没有问题了,不知道为什么 > > java.lang.NullPointerExcpetion > at > org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336) > at > org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159) > at > org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:101) > > -- > 发件人:china_tao > 发送时间:2020年8月19日(星期三) 00:17 > 收件人:user-zh > 主 题:Re: flink 1.11 order by rowtime报错 > > 错误呢?没看到。把代码贴出来看一下,是不是processtime没有设置或者设置不对 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > > -- Best, Benchao Li
Re: hive Streaming Reading 无法分组统计
Hi, 你的hint的用法应该不太对,可以参考下文档[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/hints.html 18579099...@163.com <18579099...@163.com> 于2020年8月19日周三 上午12:17写道: > SELECT > id, > count(1) > FROM > hive_user_parquet > GROUP BY > id > /*+ OPTIONS('streaming-source.enable'='true', > 'streaming-source.consume-start-offset'='2020-08-18') */ > 通过分组统计好像是会报语法错误的,这是什么原因造成的呢 > > > > 18579099...@163.com > -- Best, Benchao Li
Re: Print SQL connector无法正常使用
Hi, 看起来你用的hbase的配置还是老的配置,1.11中已经更新的新的connector配置选项了, 你可以尝试下用新版的connector配置[1]。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html xiao cai 于2020年8月17日周一 上午11:52写道: > Hi All: > 目前使用flink sql的Print SQL connector,想要将查询的结果打印出来,结果报错: > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > Could not find a suitable table factory for > 'org.apache.flink.table.factories.TableSinkFactory' in > the classpath. > > > 可以保证:HBase-connector是在lib包下存在的,是否我还需要在lib下添加什么依赖? > > > 下面为执行的sql: > > > CREATE TABLE dimension ( > rowKey STRING, > cf ROW, > tas BIGINT > ) WITH ( > 'connector.type' = 'hbase', > 'connector.version' = '1.4.3', > 'connector.table-name' = ’test', > 'connector.write.buffer-flush.max-rows' = '10', > 'connector.zookeeper.quorum' = ‘IP:port', > 'connector.zookeeper.znode.parent' = '/hbase', > ); > > > CREATE TABLE print_table ( > f0 STRING, > f1 INT, > f2 BIGINT, > f3 BIGINT > ) WITH ( > 'connector' = 'print' > ); > > > insert into print_table > select rowKey, cf.age, cf.area, tas > from dimension -- Best, Benchao Li