Re: flinksql 经过优化后,group by字段少了

2024-05-19 文章 Benchao Li
tion=[hash[uid, > uname]]) > nbsp; nbsp; nbsp; nbsp; nbsp;+- > Calc(select=[uid, uname, uage], where=[(dt = CAST(CURRENT_DATE()))]) > nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; +- > TableSourceScan(table=[[default_catalog, default_database, kafkaTable]], > fields=[uid, uname, dt, uage]) > > > > 请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢 > > > > -- > > Best, > Benchao Li -- Best, Benchao Li

Re: flinksql 经过优化后,group by字段少了

2024-05-19 文章 Benchao Li
catalog, default_database, kafkaTable]], > fields=[uid, uname, dt, uage]) > > > > 请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢 -- Best, Benchao Li

Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 文章 Benchao Li
lable, and accurate data > > > > streaming > > > > >> applications. > > > > >>>> > > > > >>>> The release is available for download at: > > > > >>>> https://flink.apache.org/downloads.html > > > > >>>> > > > > >>>> Please check out the release blog post for an overview of the > > > > >> improvements for this bugfix release: > > > > >>>> > > > > >> > > > > > > > https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/ > > > > >>>> > > > > >>>> The full release notes are available in Jira: > > > > >>>> > > > > >> > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282 > > > > >>>> > > > > >>>> We would like to thank all contributors of the Apache Flink > > > community > > > > >> who made this release possible! > > > > >>>> > > > > >>>> > > > > >>>> Best, > > > > >>>> Yun, Jing, Martijn and Lincoln > > > > >> > > > > > > > > > > > > > > > > > -- > > Best, > > Hangxiang. -- Best, Benchao Li

Re: 急 [FLINK-34170] 何时能够修复?

2024-03-14 文章 Benchao Li
enix表进行scan查询,再根据scan查询结果按trans_date字段值进行过滤 > > > https://issues.apache.org/jira/browse/FLINK-34170 -- Best, Benchao Li

Re: RE: lock up表过滤条件下推导致的bug

2023-12-25 文章 Benchao Li
wrote: > >> 我的sql如下: > >> 、 > >> > >> > >> t_purch_apply_sent_route 是通过flink cdc创建的 > >> t_purch_apply_sent_route_goods 是普通的jdbc > >> 我期望的结果是返回符合过滤条件的;但现在执行的结果,会返回t_purch_apply_sent_route表所有数据 > >> 这显然不符合我的预期,原因应该是因为过滤条件进行了过早的下推 > >> 这应该算是bug吧,或者要满足我的预期,该怎么写sql? > >> > >> > >> > >> -- Best, Benchao Li

Re: Re: flink 1.18.0 使用 hive beeline + jdbc driver连接sql gateway失败

2023-10-30 文章 Benchao Li
with existing > Hive clients, such as Hive JDBC, Beeline, DBeaver, Apache Superset and so on. > 这里有提到Beeline工具,难道不是 beeline> !connect jdbc:flink://localhost:8083 这样的连接方式了? > > > > > > > > > > > > > > > > > > > > 在 2023-10-30 11:27:1

Re: flink 1.18.0 使用 hive beeline + jdbc driver连接sql gateway失败

2023-10-29 文章 Benchao Li
. . . . . . .> 'format' = 'csv' > . . . . . . . . . . . . . . . . . . . .> ); > Failed to create the executor. > Connection is already closed. > -- Best, Benchao Li

Re: [ANNOUNCE] Apache Flink 1.18.0 released

2023-10-26 文章 Benchao Li
applications. > > >>>>>> > > >>>>>> The release is available for download at: > > >>>>>> https://flink.apache.org/downloads.html > > >>>>>> > > >>>>>> Please check out the release blog post for an overview of the > > >>>>> improvements > > >>>>>> for this release: > > >>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/ > > >>>>>> > > >>>>>> The full release notes are available in Jira: > > >>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885 > > >>>>>> > > >>>>>> We would like to thank all contributors of the Apache Flink > > >> community > > >>>> who > > >>>>>> made this release possible! > > >>>>>> > > >>>>>> Best regards, > > >>>>>> Konstantin, Qingsheng, Sergey, and Jing > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > > -- Best, Benchao Li

Re: GenericRowData与BinaryRowData的转换

2023-03-13 文章 Benchao Li
ata > toString,BinaryRowData没有实现该方法QQAQ > > Benchao Li 于2021年4月9日周五 10:42写道: > > > GenericRowData和BinaryRowData都是RowData这个接口的具体实现。 > > 所以你只需要针对RowData进行编程即可,不能假设它使用哪个具体实现。 > > > > 关于你的问题,在算子之间数据计算和转换的时候,会有很多地方构造出来BinaryRowData, > > 比如典型的就是序列化的时候都会按照BinaryRowData来

Re: Flink SQL Client 解析 Protobuf

2022-07-03 文章 Benchao Li
格式,Flink SQL Client 没有直接支持。 > > Flink SQL Client 有一个 Raw 格式选项,不过我们还没有找到如何使用这个的文档; > > 看看大家有没有相关的经验可以分享 > > 多谢 > -- Best, Benchao Li

Re: Re: Re: flink sql回撤流sink优化问题

2022-01-06 文章 Benchao Li
mini-batch对aggregate算子是有效的,开启了之后它的输出会降低一些,从而降低了sink的输出压力。 casel.chen 于2022年1月7日周五 07:42写道: > mini-batch优化针对sink算子也有效吗?我是直接aggregate without window然后将聚合结果输出到sink算子。 > > > > > > > > > > > > > > > > > > 在 2022-01-06 20:43:00,"Bench

Re: Re: flink sql回撤流sink优化问题

2022-01-06 文章 Benchao Li
>> > >> > >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 > >> > >> > >> > >> INSERT INTO mysql_sink_table > >> > >> SELECT category, dt, LAST_VALUE(total) > >> > >> OVER ( > >> > >> PARTITION BY category > >> > >> ORDER BY PROCTIME() > >> > >> RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW > >> > >> ) AS var1 > >> > >> FROM ( > >> > >> SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt > >> > >> ); > -- Best, Benchao Li

Re: FlinkSQL Source和Sink的Operator name为什么格式不同

2021-10-02 文章 Benchao Li
ds_k], fields=[id, > > name]) > > Sink: Sink(table=[default_catalog.default_database.ads_k2], fields=[id, > > name])) > > > > > > TableSourceScan 和 Sink相比多了个 中括号,并且采用 ',' 分割名字功空间,这是为什么 > > > -- Best, Benchao Li

Re: flink-1.12.0 流模式 使用 lag问题

2021-09-21 文章 Benchao Li
mat' = 'json' > ); > > > > SELECT > user_id, > item_id, > behavior, > next_bv > FROM > ( SELECT *, lag( behavior, 1 ) over ( PARTITION BY user_id ORDER > BY proctime ) AS next_bv FROM KafkaTable ) t; -- Best, Benchao Li

Re: Flink 从checkpoint恢复时,部分状态没有正确恢复

2021-08-30 文章 Benchao Li
这个问题已经在1.12中修复了,参考: https://issues.apache.org/jira/browse/FLINK-18688 Benchao Li 于2021年8月30日周一 下午7:49写道: > Hi xingxing, > > 看起来你可能也遇到了这个bug了。 > 我们遇到过一个bug是这样的,在group by的多个字段里面,如果有2个及以上变长字段的话,会导致底层的BinaryRow序列化 > 的结果不稳定,进而导致状态恢复会错误。 > 先解释下变长字段,这个指的是4byte存不下的数据类型,比如典型的varchar、li

Re: Flink 从checkpoint恢复时,部分状态没有正确恢复

2021-08-30 文章 Benchao Li
; 'article_auto_video_play_click'))a > groupby platform, type, `time`; > > > 期待大家的帮助与回复,希望能给些问题排查的思路! > > > > -- Best, Benchao Li

Re: 分组滚动窗口 无法触发计算,由于 watermark 没有被生成,或者被计算。

2021-04-13 文章 Benchao Li
MBLE_START(`event_time`, INTERVAL '1' SECOND) AS `log_time`, > >> `dim`, > >> count(1), > >> FROM input_table > >> GROUP BY TUMBLE(`event_time`, INTERVAL '1' SECOND),`dim` > >> > >> > >> > >> *Best Regards* > >> *Jeremy Mei* > >> > > > > > > -- > > > > *Best Regards* > > *Jeremy Mei* > > > > > -- > > *Best Regards* > *Jeremy Mei* > -- Best, Benchao Li

Re: GenericRowData与BinaryRowData的转换

2021-04-08 文章 Benchao Li
time层把GenericRowData转换BinaryRowData的规则是什么? > -- Best, Benchao Li

Re: Flink1.11执行sql当判空使用<> null,程序直接结束

2021-03-19 文章 Benchao Li
嗯,是这样的。 datayangl 于2021年3月19日周五 下午5:55写道: > calcite解析将<> null 解析为unknown, 在flink优化阶段直接将unkown这个条件默认视为false,通过规则匹配 > 将整条sql优化为values(没有任何结果的sql),于是直接将程序的source task finish了。这个过程我理解的对吗? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, Benchao Li

Re: Flink1.11执行sql当判空使用<> null,程序直接结束

2021-03-19 文章 Benchao Li
> is > not null 可以正常运行并一直产生数据。 > > 疑问:我明白is not null是正确的用法,问题是用<> null 为什么程序会直接结束而且没有任何报错,感觉像是当作批处理去运行了。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li

Re: flink sql 的 count(distinct )问题

2021-03-16 文章 Benchao Li
Hi, 你可以理解为用的是MapState来保存的状态。 op <520075...@qq.com> 于2021年3月16日周二 下午3:00写道: > 各位大佬好,想问下flinksql里的count (distinct)默认是用哪种state保存的状态 -- Best, Benchao Li

Re: UDF 重复调用的问题、

2021-03-02 文章 Benchao Li
ture 社区有规划了吗? > > > Benchao Li 于2021年3月3日周三 上午10:23写道: > > > 当前的确是还没有表达式复用的优化,所以表达式最终都是会重复执行的。 > > 这个应该是未来要优化的一个点,我们内部也是刚刚做了这个feature。 > > > > 这个没有复用不只是在SQL里面看到的多少次,就会执行多少次,而是在 > > plan的过程中会将表达式完全展开,比如下面的SQL: > > ```SQL > > SELECT my_map['

Re: UDF 重复调用的问题、

2021-03-02 文章 Benchao Li
`dump_json_to_map`这个函数执行3次。 HunterXHunter <1356469...@qq.com> 于2021年3月3日周三 上午9:43写道: > 为什么4次是没问题的,感觉只执行一次才是最优的 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, Benchao Li

Re: 关于 stream-stream Interval Join 的问题

2020-12-10 文章 Benchao Li
> """.stripMargin) > > > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到 > > > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source > > subtask的watermark。 > > --- >

Re: interval join 时checkpoint失败

2020-12-09 文章 Benchao Li
反压的话,你可以重点看下你使用的是什么state backend, 如果是filesystem,那状态就是放heap的,这种你需要重点看下gc相关的问题; 如果是rocksdb,这种状态是直接序列化到rocksdb中了,一般很少有内存问题,更多的是IO问题,或者CPU瓶颈。 你可以按照找个思路排查一下。 song wang 于2020年12月10日周四 上午11:38写道: > hi,Benchao, > 是的,任务失败时,右流出现了反压,已经连续两天出现这个问题了,我看下为啥会出现反压,感谢! > > Benchao Li 于2020年12月10日

Re: interval join 时checkpoint失败

2020-12-09 文章 Benchao Li
ig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > 各位大佬能否给些排查建议呢? > > > > > > > -- Best, Benchao Li

Re: 关于 stream-stream Interval Join 的问题

2020-12-08 文章 Benchao Li
2)此外,还有一个点,这个我也不确认。如果是datastream > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。 > > input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。 > > macia kk 于2020年12月9日周三 上午1:17写道: > > > @Benchao Li 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到 >

Re: 关于 stream-stream Interval Join 的问题

2020-12-07 文章 Benchao Li
MySQL DB 所有的 Binlog 打进去了。我的 > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order > > > item > > > > >信息,所以 我用: > > > > > > > > > > SELECT * > > > > > FROM A > > > > > LEFT OUT JOIN B > > > > > ON order_id > > > > > Where A.event_time > B.event_time + 30 s > > > > > A.event_time > B.event_time - 30 s > > > > > > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark > > > > Structural > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join, > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题? > > > > > > > > > > -- Best, Benchao Li

Re: flink-json 函数用法

2020-11-29 文章 Benchao Li
使用的默认的blink), > 但是总是报错函数的用法不对,有相关资料来介绍这些函数的使用方法的吗?或者示例 > > > > 闫云鹏 > > DXM 支付业务部 > > 地址:北京市海淀区西北旺东路度小满金融总部 > > 邮编:100085 > > 手机:13693668213 > > 邮箱: > > > yanyunpeng@ > > > mailto: > > > yanyunpeng@ > > > > > > > 度小满金融 > > > > 精于科技 值得信赖 > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > > -- Best, Benchao Li

Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 Benchao Li
zilong xiao 于2020年11月24日周二 下午4:46写道: > 这是从哪看出来的呢 求指点,另外如果想用DDL写的schema 应该怎么做呢? > > Benchao Li 于2020年11月24日周二 下午4:33写道: > > > 看起来这个format是用的自动推导schema,而不是用的DDL写的schema。 > > > > zilong xiao 于2020年11月24日周二 下午4:13写道: > > > > > 用的Flink1.11 不过是用的别人写的format,估计是

Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 Benchao Li
看起来这个format是用的自动推导schema,而不是用的DDL写的schema。 zilong xiao 于2020年11月24日周二 下午4:13写道: > 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧, > https://github.com/yangyichao-mango/flink-protobuf > > Benchao Li 于2020年11月24日周二 下午3:43写道: > > > 看起来你的DDL写的没有什么问题。 > > > > 你用的

Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 Benchao Li
看起来你的DDL写的没有什么问题。 你用的是哪个Flink版本呢? 此外就是可以发下更完整的异常栈么? zilong xiao 于2020年11月24日周二 下午2:54写道: > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~ > > Benchao Li 于2020年11月24日周二 下午2:49写道: > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 > > > > zilong xiao 于2020年11月24日

Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 Benchao Li
你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 zilong xiao 于2020年11月24日周二 上午10:49写道: > [image: image.png] > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。 > -- Best, Benchao Li

Re: Flink 1.11 SQL作业中调用UDTF 出现“No match found for function signature ”异常

2020-10-26 文章 Benchao Li
//apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li

Re: flinksql 不支持 % 运算

2020-10-26 文章 Benchao Li
current SQL conformance > level > > > 看了下flink 的issue ,已经有人碰到过了,说是要1.12版本修复 > > > > > 想问下:如果再1.11版本,flink-sql 要怎么操作才能支持 % 运算呢? 可以通过修改配置文件来实现么?比如flink-conf.yaml -- Best, Benchao Li

Re: 在使用hive catalog的情况下 json format 大小写问题

2020-10-21 文章 Benchao Li
现在的json format就是支持大小写区分的吧。可以提供下你的DDL和样例数据么? 王刚 于2020年10月21日周三 下午8:08写道: > hi~ 各位大佬, > > 由于catalog是不支持大小写 如果kafka 的数据是json格式,且某些json的key区分大小写的, > > 这个时候建的在hive catalog里建的kafka表的某些json字段是取不到的 > > flink 1.11有现成解决的方案可以解决这种问题不 > -- Best, Benchao Li

Re: Re: flink sql ddl 是否支持映射多层json

2020-10-21 文章 Benchao Li
嗯,道理是一样的。ROW/MAP/ARRAY这些本来就是嵌套类型,嵌套深度没有限制 Roc Marshal 于2020年10月21日周三 下午2:38写道: > 如果是深度是三层以上也是类似的嵌套语法吗?或者说是其他的写法? > > > 谢谢 > > Best Roc. > > > > > > 在 2020-09-24 20:53:12,"Benchao Li" 写道: > >这个情况现在是支持的,可以用类似于这种写法: > >```SQL > >CRE

Re: flinkSQL1.11写出数据到jdbc fleld type do not match

2020-10-19 文章 Benchao Li
ming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:53) > at > > com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:24) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ... 11 more > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li

Re: SQL interval join 问题

2020-10-19 文章 Benchao Li
s2.id where > s1.proctime between s2.proctime - INTERVAL '5' SECOND and s2.proctime + > INTERVAL '5' SECOND; > > > > > >最后的 join 语句预期是 如果两个source的消息, 先后到达时间超过 10 秒,则输出, > 两条消息。 > > > > > >目前的观察结果是,如果两条消息, 先后到达时间超过10 秒,输出为:, > >为何超过 10 秒后,仍然会输出 ? > -- Best, Benchao Li

Re: 回复: flink 自定义udf注册后不能使用

2020-10-18 文章 Benchao Li
发送时间: 2020年10月16日 6:47 > 收件人: user-zh@flink.apache.org > 主题: Re: 回复:回复: flink 自定义udf注册后不能使用 > > 是的,是我传参有问题 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li

Re: flinksql如何控制结果输出的频率

2020-10-14 文章 Benchao Li
可以具体描述下你的问题么,没太看懂你的问题。 smallwong 于2020年10月14日周三 下午6:57写道: > 哈喽,请问是做了什么调整?才10秒的窗口,期待每秒都输出结果的 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, Benchao Li

Re: flink-SQL1.11版本对map类型中value的空指针异常

2020-10-13 文章 Benchao Li
是的,所以应该用createNullableConverter,而不是createConverter 史 正超 于2020年10月14日周三 上午10:45写道: > > 但是 方法上有这样的一个注释:Creates a runtime converter which assuming input object is > not null. > 代码这样写的前提是,不允许对象的值为null的。 > ____ > 发件人: Benchao Li > 发送时间: 2020年10月14

Re: flink-SQL1.11版本对map类型中value的空指针异常

2020-10-13 文章 Benchao Li
p类型的value值为null,会报空指针异常的。 > > 发件人: 奔跑的小飞袁 > 发送时间: 2020年10月14日 1:46 > 收件人: user-zh@flink.apache.org > 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常 > > other_para MAP > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li

Re: flink-SQL1.11版本对map类型中value的空指针异常

2020-10-13 文章 Benchao Li
> return avroObject -> { > final Map map = (Map) avroObject; > Map result = new HashMap<>(); > for (Map.Entry entry : map.entrySet()) { > Object key = > keyConverter.convert(entry.getKey()); > Object value = > valueConverter.convert(entry.getValue()); > result.put(key, value); > } > return new GenericMapData(result); > }; > } > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li

Re: Flink的table-api不支持.

2020-10-09 文章 Benchao Li
gt; "FETCH" ... > > "FROM" ... > > "INTERSECT" ... > > "LIMIT" ... > > "OFFSET" ... > > "ORDER" ... > > "MINUS" ... > > "UNION" ... > > "," ... > > > > > > > > Table result = tEnv.sqlQuery("select * from OrderA join OrderB on > > OrderA.user=OrderB.user"); > -- Best, Benchao Li

Re: Re: sql-cli执行sql报错

2020-09-29 文章 Benchao Li
chao的回复是的对的, > 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包 > flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。 > > > > 相关lib包: > > flink-connector-kafka_2.12-1.10.2.jar > > kafka-clients-0.11.0.3.jar > > 祝好 > Leonard > -- Best, Benchao Li

Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

2020-09-29 文章 Benchao Li
并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢? > 使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度? -- Best, Benchao Li

Re: 使用异步IO时,数据写入到capacity数后,卡住不再消费source端数据了。

2020-09-29 文章 Benchao Li
ntln("time out ... ") > } > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li

Re: 回复: BLinkPlanner sql join状态清理

2020-09-29 文章 Benchao Li
> > 不太清楚为什么用了mini batch就没读取这个配置。 > 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, Benchao Li

Re: 如何在流式数据源上使用分析函数LAG和EAD函数

2020-09-29 文章 Benchao Li
d":"1", "speed":4.0,"speed_1":3.0, > "speed_2":2.0} > {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0, > "speed_2":3.0} > {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0, > "speed_2":4.0} > > 实际得到的结果数据是: > {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0, > "speed_2":1.0} > {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0, > "speed_2":2.0} > {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0, > "speed_2":3.0} > {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0, > "speed_2":4.0} > {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0, > "speed_2":5.0} > {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0, > "speed_2":6.0} > > 想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li

Re: sql-cli执行sql报错

2020-09-28 文章 Benchao Li
l*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。 > >> > >> > >> > 相关lib包: > >> > flink-connector-kafka_2.12-1.10.2.jar > >> > kafka-clients-0.11.0.3.jar > >> > >> 祝好 > >> Leonard > > > > > -- Best, Benchao Li

Re: 回复:sql-cli执行sql报错

2020-09-27 文章 Benchao Li
VAL '5' SECOND > topic=heli01 > > The following factories have been considered: > org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > > > > hl9...@126.com > -- Best, Benchao Li

Re: Flink SQL 1.11.1 executeSql/SqlUpdate时 SQL validation的一些问题

2020-09-27 文章 Benchao Li
bsTableEnv.executeSql("select * from a.b") > > > 然后发现了以下现象: > > > 从图中可以得知,在`DatabaseCalciteSchema` 中 > 我发现下面几个奇怪的点 > >1. databaseName 是 ‘'default' >2. getTable将 `a`作为参数传入,而不是b (a是库名,b是表名) > > > > 首先可以确定的是这个发生在validation阶段 > > 其次我发现特意针对这块做了一次catch `TableNotExistException`的操作 > > 请问这部分代码的用途和目的是? > > > > -- Best, Benchao Li

Re: 回复:flink sql count问题

2020-09-27 文章 Benchao Li
nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp; > >gt;发送时间:amp;nbsp;2020年9月27日(星期天) 下午4:59 > >gt;收件人:amp;nbsp;"user-zh" amp;gt;; > >gt; > >gt;主题:amp;nbsp;Re:flink sql count问题 > >gt; > >gt; > >gt; > >gt;最好把null 变成0,amp;nbsp; 你这样amp;nbsp;amp;nbsp; > sum(if(name like '南京%',1 , 0)) > >gt;在 2020-09-27 16:53:56,"zya" 写道: > >gt;amp;gt;请教各位: > >gt;amp;gt;我有一个sql任务需要进行count,在count中有一个表达式,只想count符合条件的记录, > >gt;amp;gt;之前在hive中是这么写的:count(if(name like '南京%',1 , > null)),但是flink sql中count不能为null,有什么别的方法能实现该功能吗? > >gt;amp;gt;使用的是flink1.10.1 blink > >gt;amp;gt;amp;amp;nbsp; > -- Best, Benchao Li

Re: Re: FlinkSQL是否支持设置窗口trigger实现continuious trigger呢

2020-09-27 文章 Benchao Li
变了。 > > > >Michael Ran 于2020年9月27日周日 下午2:39写道: > > > >> 滑动窗口 > >> 在 2020-09-27 13:25:37,"赵一旦" 写道: > >> >如题,不使用DatastreamAPI,使用FlinkSQL能否实现五分钟窗口,每10秒输出一次呢? > >> > -- Best, Benchao Li

Re: 关于flink sql的数据类型

2020-09-25 文章 Benchao Li
基于1.11测试的。目前来看,json format的2个设置都设置好。然后event > time部分使用COALESCE将null情况设置为event_time 0。这么做是最好的情况啦。 > > Benchao Li 于2020年9月25日周五 下午2:08写道: > > > 你用的是哪个版本?1.11版本应该是改进过这块,不应该出现这个情况。 > > > > 赵一旦 于2020年9月25日周五 上午11:02写道: > > > > > 而且按照string无法接受"a":a,

Re: 关于flink sql的数据类型

2020-09-25 文章 Benchao Li
如果数据为 t: abc 则数据直接非法被忽略。 > > 如果数据为t: "abc",则t被转为null? > > 当然eventtime本身还有个不可null的限制,我通过COALESCE解决了。 > > > > > > 想知道有没有什么规则,尽可能避免任务失败的。因为数据一旦有一点异常导致失败就会很麻烦。 > > > > 比如那个忽略错误,实际是无法解决event time为null的情况的这种错误的。 > > 我是通过COALESCE解决的。 > > > -- Best, Benchao Li

Re: flink sql ddl 是否支持映射多层json

2020-09-24 文章 Benchao Li
"a13":{ > "a21":1, > "a22":1, > "a23":"1"} > } > > > 比如像这样的格式,怎么将a2开头的字段进行映射呢?如果现有版本不支持这个特性的话,是否可以考虑对此功能进行支持? > > > 谢谢 -- Best, Benchao Li

Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

2020-09-24 文章 Benchao Li
> 的tm的内存大小,kafka的partition数目,也无法将作业的吞吐量压上去。 > > > > -邮件原件- > 发件人: Benchao Li [mailto:libenc...@apache.org] > 发送时间: 2020年9月18日 星期五 18:49 > 收件人: user-zh > 主题: Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以 > > 提交两个作业的话,两个作业是完全独立的,都会消费全量数据。 > > 一个

Re: flink sql延迟数据

2020-09-24 文章 Benchao Li
@qq.com 于2020年9月23日周三 下午4:24写道: > > hi各位,有个问题请教一下: > 我现在使用flink sql统计一下kafka中在某个时间窗口内指定字段出现的次数,使用event > time,需要在5s内输出结果,但是数据会有一些延迟,可能大于5s,目前设置waterwark为 > WATERMARK FOR ts AS tsnbsp; - INTERVAL '5' SECODND > ,但是这样延迟大于5s的数据就会被丢弃掉,请问下其他延迟的数据有没有什么办法进行处理?我看datastream > api里面可以使用allowed > lateness,但是这部分在sql中没看到有相关语法 > > > Flink版本1.10.1 > nbsp; > > > > -- > > Best, > Benchao Li -- Best, Benchao Li

Re: [DISCUSS] Move Hive document to "Table & SQL Connectors" from "Table API & SQL"

2020-09-24 文章 Benchao Li
+1 nashcen <2415370...@qq.com> 于2020年9月24日周四 下午1:09写道: > +1 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li

Re: flink sql延迟数据

2020-09-23 文章 Benchao Li
相关语法 > > > Flink版本1.10.1 > -- Best, Benchao Li

Re: flink pb转json性能问题

2020-09-23 文章 Benchao Li
.print(message) > 先再pb 转成json 再套用 JsonRowDataDeserializationSchema处理json, > 发现处理的性能就只能达到20w左右的tps,而如果是处理json格式的数据,tps是可以达到50-60w的tps. > 想问一下,1、flink要是处理pb格式的数据,有什么好的办法? 2 > 、社区对pb format 会支持么? > 3、pb转json 有什么性能比较好的工具包 -- Best, Benchao Li

Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 Benchao Li
是4G。版本1.11.1 > ------ > 发件人:Benchao Li > 发送时间:2020年9月23日(星期三) 10:50 > 收件人:user-zh > 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重 > > Hi Tianwang, > > 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加 > > 1. time interval join会将watermark delay之后再发送,也

Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 Benchao Li
intGCDateStamps -XX:+UseGCLogFileRotation > -XX:NumberOfGCLogFiles=10 > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause" > > > > > > -- > ** > tivanli > ** > -- Best, Benchao Li

Re: 消费kafka source反压

2020-09-21 文章 Benchao Li
这个性能影响指的是跟那种情况进行对比呢? smq <374060...@qq.com> 于2020年9月21日周一 下午6:49写道: > 谢谢,多问一句,并行度为1的话,keyby算子加上keydstate对性能影响大吗 > > > > ---原始邮件--- > 发件人: "Benchao Li" 发送时间: 2020年9月21日(周一) 下午4:39 > 收件人: "user-zh" 主题: Re: 消费kafka source反压 > > > 这种反压一般是下游反压过

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 Benchao Li
可以通过SQL的where条件来过滤吧 chuyuan 于2020年9月21日周一 下午6:48写道: > 好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, Benchao Li

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 Benchao Li
uot; dt string" + > " ) stored as orcfile " + > " TBLPROPERTIES" + > " (" + > > "'partition.time-extractor.kind'='custom'," + > > "'partition.time-extractor.timestamp-pattern'='$dt'," + > > > "'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor'," > + > > "'sink.partition-commit.trigger'='partition-time'," + > > "'sink.partition-commit.delay'='0s'," + > > "'sink.partition-commit.policy.kind'='metastore'" + > ")"); > > > 第三步,把临时表的数据insert into到目标表,此时出现异常: > org.apache.flink.table.api.TableException: A raw type backed by type > information has no serializable string representation. It needs to be > resolved into a proper raw type. > > 然后打印临时表的数据结构,发现lib和properties在临时表中数据结构被解析为: > |-- lib: LEGACY('RAW', 'ANY') > |-- properties: LEGACY('RAW', 'ANY') > |-- track_id: BIGINT > |-- type: STRING > ,这说明lib LEGACY('RAW', 'ANY')无法匹配hive目标表中lib > MAP数据结构,写入失败,大概流程是这样。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li

Re: 消费kafka source反压

2020-09-21 文章 Benchao Li
这种反压一般是下游反压过来的,可以检查下最后一个反压的算子,那个才是处理能力的瓶颈。 smq <374060...@qq.com> 于2020年9月21日周一 下午2:08写道: > > 大家好,在测试flink消费速率时,发现数据处理比较慢,大概一个task每秒处理1000条左右,经过查看UI界面,发现读取kafka数据源这块source反压达到1,请问有这方面经验吗? -- Best, Benchao Li

Re: Re: [SQL] parse table name from sql statement

2020-09-21 文章 Benchao Li
日周一 上午10:21写道: > > > > > >> 可以使用calcite。解析kind为CREATE_TABLE的语句,解析INSERT,下推from的表。 > > >> > > >> > > >> > > >> > > >> > > >> 在 2020-09-21 10:12:13,"Harold.Miao" 写道: > > >> >hi all > > >> > > > >> >请教大家在复杂sql语句中parse所有的table name是怎么实现的。 > > >> > > > >> >谢谢 > > >> > > > >> >-- > > >> > > > >> >Best Regards, > > >> >Harold Miao > > >> > > > > > > > > >-- > > > > > >Best Regards, > > >Harold Miao > > > > > -- > > Best Regards, > Harold Miao > -- Best, Benchao Li

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 Benchao Li
presentation. It needs to be > resolved into a proper raw type. > 方便说下具体实现细节吗? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li

Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

2020-09-18 文章 Benchao Li
倍的。 > > 我想达到的目的不是通过多向yarn集群提交多一个app,而是通过设置并行度来提高应用的吞吐量。。 > > 求各位大佬指导 > -- Best, Benchao Li

Re: 关于flinksql 滑动窗口数据进不来的问题

2020-09-14 文章 Benchao Li
ksql 滑动窗口数据进不来的问题 > > > > 可以再详细一点描述下问题么,滑动窗口数据进不来,指的是窗口没有触发计算还是数据就没有到窗口呢? > > 如果只是窗口没有触发计算,一般用了row time的话,可以排查下watermark是否有正常生成。 > > 李杨烨 <438106...@qq.com 于2020年9月14日周一 下午1:32写道: > > 刚刚邮件图片挂了,上传了新的图片地址: > http://chuantu.xyz/t6/741/1600061331x-1224481926.jpg > 使用rowTime做的滑动 > > > > -- > > Best, > Benchao Li -- Best, Benchao Li

Re: 关于flinksql 滑动窗口数据进不来的问题

2020-09-14 文章 Benchao Li
可以再详细一点描述下问题么,滑动窗口数据进不来,指的是窗口没有触发计算还是数据就没有到窗口呢? 如果只是窗口没有触发计算,一般用了row time的话,可以排查下watermark是否有正常生成。 李杨烨 <438106...@qq.com> 于2020年9月14日周一 下午1:32写道: > 刚刚邮件图片挂了,上传了新的图片地址:http://chuantu.xyz/t6/741/1600061331x-1224481926.jpg > 使用rowTime做的滑动 -- Best, Benchao Li

Re: 关于使用flinksql 生成滑动窗口 table数据进不来的问题

2020-09-13 文章 Benchao Li
你好,你的图片挂了,可以把图片放到第三方图床工具然后把链接发出来。或者直接用文本描述的问题。 李杨烨 <438106...@qq.com> 于2020年9月14日周一 上午11:25写道: > > 根据rowTime做的滑动 -- Best, Benchao Li

Re: UDAF函数在over窗口使用问题

2020-09-13 文章 Benchao Li
gt; 'com.binance.risk.flink.udf.AggDistinctDetail'; > > select > realIp , > UDF_InfoDistinctMerge(userId) over w1 as userSet > from source_table > window w1 as (partition by realIp order by requestDateTime asc RANGE > BETWEEN > INTERVAL '24' hour preceding AND CURRENT ROW) ; > > 实际测试下来,发现聚集后的字符串userSet是一直在增长,即使窗口时间已经超过24小时,依然被聚集到userSet这个结果中,这和预期不符。 > > 问题: > 是上面UDAF的实现有啥问题么?还是UDAF在over窗口上有bug? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li

Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-10 文章 Benchao Li
sql 算子内部会自动处理这些状态。 这个状态只是聚合的中间结果,并不需要保留原始数据。 当然这个聚合的中间结果状态,也可以指定state retention time来清理一些过期的状态。 last_value只是一个聚合函数,没啥特殊的地方,而且只是按照处理时间获取最后一条数据的聚合函数。 lec ssmi 于2020年9月10日周四 下午2:35写道: > 上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗? > 感觉底层和 last_value() group by id是一样的。 > > Benchao

Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-09 文章 Benchao Li
金额。那么总金额就是错的。nbsp;nbsp; > 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 > > > nbsp; 刚入坑实时处理,请大神赐教 > > > > -- > > Best, > Benchao Li -- Best, Benchao Li

Re: flink实时统计GMV,如果订单金额下午变了该怎么处理

2020-09-08 文章 Benchao Li
GMV实时统计为1000. > 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka. > 这时如果不减去上午已经统计的金额。那么总金额就是错的。 > 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。 > > > 刚入坑实时处理,请大神赐教 -- Best, Benchao Li

Re: how flink-sql connector kafka reads array json

2020-09-07 文章 Benchao Li
eam>,再转换flatMap转换成DataStream,再使用tableEnv.fromDataStream把它变成tableSource? > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li

Re: FlinkSQL如何处理上游的表结构变更

2020-09-05 文章 Benchao Li
log,当发生表结构变更时,希望能够实时的变动flink内部表的schema,但是目前来看,表的schema都是create > table时写死的,有什么办法可以处理这种场景呢 -- Best, Benchao Li

Re: 回复:请指教一个关于时间窗的问题,非常感谢!

2020-09-03 文章 Benchao Li
020年9月2日(星期三) 下午3:20 > 收件人:"user-zh" > 主题:请指教一个关于时间窗的问题,非常感谢! > > > > 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看! > > //指定eventtime字段及生成watermark > DataStream withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( > WatermarkStrategy > > . //. .withIdleness(Duration.ofSeconds(10)) //即时没数据时,也生成watermark > .withTimestampAssigner((event, timestamp)-event.f3)); > > StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > tenv.registerDataStream( > "log", > withTimestampsAndWatermarksDS, > "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); > > String sql = "select appid,eventid,cnt," + > > "(starttime + interval '8' hour ) as stime," + > > "(endtime + interval '8' hour ) as etime " > + > "from > (select appid,eventid,count(*) as cnt," + > > "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + > > "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + > "from > log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME > '00:00:00')"; //希望整点结束时触发时间窗关闭 > > Table table = tenv.sqlQuery(sql); > DataStream Result.class); > > 输出的结果是: > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。 > (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 > 等到这条数据上来后才触发 > ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 > 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 > //2020/9/2 15:23:35} > 请问一下哪里出了问题?万分感谢! > -- Best, Benchao Li

Re: flink json ddl解析

2020-09-02 文章 Benchao Li
nslated to row) > > > > > >else if (items.isArray()) { > > > > > > final TypeInformation[] types = convertTypes(location + > > '/' > > > + > > > > > > ITEMS, items, root); > > > > > > > > > > > > // validate that array does not contain additional items > > > > > > if (node.has(ADDITIONAL_ITEMS) && > > > > > > node.get(ADDITIONAL_ITEMS).isBoolean() && > > > > > > node.get(ADDITIONAL_ITEMS).asBoolean()) { > > > > > > throw new IllegalArgumentException( > > > > > > "An array tuple must not allow additional items in > > node: > > > " > > > > > > + location); > > > > > > } > > > > > > > > > > > > return Types.ROW(types); > > > > > >} > > > > > >throw new IllegalArgumentException( > > > > > > "Invalid type for '" + ITEMS + "' property in node: " + > > > > location); > > > > > > } > > > > > > > > > > > > > > > > > > > > > -- Best, Benchao Li

Re: flink1.11时间函数

2020-08-28 文章 Benchao Li
个此功能不确定指的是这两个时间函数不能用吗 > -- Best, Benchao Li

Re: Flink 维表延迟join

2020-08-27 文章 Benchao Li
案一不能保证数据量一定可以全部加载到内存,方案二又需要额外的外部存储,增加了系统结构的复杂度 > > 请问各位有什么更好的建议嘛?感谢 > > 原始邮件 > 发件人: Leonard Xu > 收件人: Jark Wu > 抄送: user-zh; Benchao Li > 发送时间: 2020年8月27日(周四) 20:11 > 主题: Re: Flink 维表延迟join > > > 多谢 Jark 提议 > > Issue[1] 建好了, 大家可以在issue下讨论。 > >

Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 文章 Benchao Li
;>>>> Hi all, > > > > > >>>>> > > > > > >>>>> On behalf of the Flink PMC, I'm happy to announce that > Dian Fu > > is > > > > now > > > > > >>>>> part of the Apache Flink Project Management Committee > (PMC). > > > > > >>>>> > > > > > >>>>> Dian Fu has been very active on PyFlink component, > working on > > > > various > > > > > >>>>> important features, such as the Python UDF and Pandas > > > integration, > > > > > and > > > > > >>>>> keeps checking and voting for our releases, and also has > > > > successfully > > > > > >>>>> produced two releases(1.9.3&1.11.1) as RM, currently > working as > > > RM > > > > > to push > > > > > >>>>> forward the release of Flink 1.12. > > > > > >>>>> > > > > > >>>>> Please join me in congratulating Dian Fu for becoming a > Flink > > PMC > > > > > >>>>> Member! > > > > > >>>>> > > > > > >>>>> Best, > > > > > >>>>> Jincheng(on behalf of the Flink PMC) > > > > > >>>>> > > > > > >>>> > > > > > >> > > > > > >> -- > > > > > >> Best regards! > > > > > >> Rui Li > > > > > >> > > > > > > > > > > > > > > > > > > > > > -- Best, Benchao Li

Re: Flink 维表延迟join

2020-08-26 文章 Benchao Li
。 > > Thanks -- Best, Benchao Li

Re: 关于sink失败 不消费kafka消息的处理

2020-08-26 文章 Benchao Li
sink > > > 不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto > commit > > > offset 看起来似乎没有什么区别 > > > > > > 可否具体解释一下? 谢谢! > > > > > > Eleanore > > > > > > On Tue, Aug 25, 2020 at 9:59 PM Benchao Li > wrote:

Re: 关于sink失败 不消费kafka消息的处理

2020-08-25 文章 Benchao Li
这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。 范超 于2020年8月26日周三 上午11:38写道: > 大家好,我现在有个疑问 > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库; > > 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢? > > > 多谢大家了 > > 范超 > -- Best, Benchao Li

Re: flink1.11 sql问题

2020-08-25 文章 Benchao Li
还是这个问题,如果字段的值有时候是json有时候是json数组,那么我只想把它当做字符串显示,该怎么写? > > > > > 在2020年08月25日 14:05,酷酷的浑蛋 写道: > 我知道了 > > > > > 在2020年08月25日 13:58,酷酷的浑蛋 写道: > > > > > flink1.11 > > 读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理? > > -- Best, Benchao Li

Re: Elasticsearch 写入问题

2020-08-23 文章 Benchao Li
你指的是用json format么?这个是json format的问题,当前的确是会把所有字段都写进去,不管是否为null。 我们内部也有类似需求,我们是修改了json format,可以允许忽略null的列。 于2020年8月23日周日 下午6:11写道: > 在flink1.11中使用table sql写入时,有一些字段为空时,依然被写入到elasticsearch,这个方式应该不太恰当。 > > > -- Best, Benchao Li

Re: 1.11版本,关于TableEnvironment.executeSql("insert into ..."),job名称设置的问题

2020-08-23 文章 Benchao Li
; + > >" c_val bigint, " + > >" wStart TIMESTAMP(3) " + > >") WITH ('connector' = 'print') "; > > bsTableEnv.executeSql(sinkDDL); > > > > System.out.println(bsTableEnv.from("print_table").getSchema()); > > > > Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str), > TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by > TUMBLE(ts, INTERVAL '5' second), f_random"); > > bsTableEnv.executeSql("insert into print_table select * from " + table); > > > -- Best, Benchao Li

Re: Re: 如何设置FlinkSQL并行度

2020-08-21 文章 Benchao Li
有人帮忙解答下吗,此外补充个相关问题,除了并行度,flink-sql情况下,能做检查点/保存点,并基于检查点/保存点重启sql任务吗。 > >>> > > >>> > 发件人: "Zhao,Yi(SEC)" > >>> > 日期: 2020年8月13日 星期四 上午11:44 > >>> > 收件人: "user-zh@flink.apache.org" > >>> > 主题: 如何设置FlinkSQL并行度 > >>> > > >>> > 看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。 > >>> > 如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢? > >>> > > >>> > 比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。 > >>> > > >>> > > >>> > > > > > -- Best, Benchao Li

Re: JDBC connector 写入 mysql 每秒大约只有 200KB 是正常的吗

2020-08-20 文章 Benchao Li
quot;insert into result_stuff select stuff_id, > stuff_base_id, stuff_name from gen_stuff"); > } > } > ``` > > 然而,mysql 每秒大约只多 1 条数据。如果按一条数据 20B 来算,写入速度是 200KB/s,这无法满足我的需求。。。 > > 请问,是我哪里的配置有问题,还是有其它更好的写入数据库的方案,谢谢给出任何建议的人。 > > 我使用的和 jdbc 有关的依赖如下: > > ```xml > > org.apache.flink > > flink-connector-jdbc_${scala.binary.version} > ${flink.version} > > > mysql > mysql-connector-java > 8.0.21 > > ``` > > (作为对比,在我的电脑上使用 datagen 生成数据,写入文件系统 sinker 的效率大约是 23MB/s) > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li

Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 文章 Benchao Li
流后,每个子流应该是只有自己的state,但从算子实例考虑,好像只有1个state实例存在。 > > Benchao Li 于2020年8月20日周四 下午4:40写道: > > > Hi, > > > > 问题1&2 都不存在多线程的问题。Flink底层来保证这些方法都是在同一个线程串行执行的。 > > > > shizk233 于2020年8月20日周四 下午2:22写道: > > > > > Hi all, > > > > > > 请教一下,

Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能

2020-08-20 文章 Benchao Li
例,其中的MapState应该也是一个实例,那么不同key下的 > Process过程是并发执行的还是顺序执行的,会竞争MapState资源吗? > -- Best, Benchao Li

Re: flink 1.11 web ui请教

2020-08-19 文章 Benchao Li
2.底下的 records received等几个列都是0。怎么样才会统计? > > > -- Best, Benchao Li

Re: flink 1.11 order by rowtime报错

2020-08-19 文章 Benchao Li
> 收件人:user-zh > 主 题:Re: flink 1.11 order by rowtime报错 > > 错误呢?没看到。把代码贴出来看一下,是不是processtime没有设置或者设置不对 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > > -- Best, Benchao Li

Re: hive Streaming Reading 无法分组统计

2020-08-19 文章 Benchao Li
FROM > hive_user_parquet > GROUP BY > id > /*+ OPTIONS('streaming-source.enable'='true', > 'streaming-source.consume-start-offset'='2020-08-18') */ > 通过分组统计好像是会报语法错误的,这是什么原因造成的呢 > > > > 18579099...@163.com > -- Best, Benchao Li

Re: Print SQL connector无法正常使用

2020-08-19 文章 Benchao Li
r.zookeeper.quorum' = ‘IP:port', > 'connector.zookeeper.znode.parent' = '/hbase', > ); > > > CREATE TABLE print_table ( > f0 STRING, > f1 INT, > f2 BIGINT, > f3 BIGINT > ) WITH ( > 'connector' = 'print' > ); > > > insert into print_table > select rowKey, cf.age, cf.area, tas > from dimension -- Best, Benchao Li

  1   2   3   4   >