tion=[hash[uid,
> uname]])
> nbsp; nbsp; nbsp; nbsp; nbsp;+-
> Calc(select=[uid, uname, uage], where=[(dt = CAST(CURRENT_DATE()))])
> nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; +-
> TableSourceScan(table=[[default_catalog, default_database, kafkaTable]],
> fields=[uid, uname, dt, uage])
>
>
>
> 请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢
>
>
>
> --
>
> Best,
> Benchao Li
--
Best,
Benchao Li
catalog, default_database, kafkaTable]],
> fields=[uid, uname, dt, uage])
>
>
>
> 请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢
--
Best,
Benchao Li
lable, and accurate data
> > > > streaming
> > > > >> applications.
> > > > >>>>
> > > > >>>> The release is available for download at:
> > > > >>>> https://flink.apache.org/downloads.html
> > > > >>>>
> > > > >>>> Please check out the release blog post for an overview of the
> > > > >> improvements for this bugfix release:
> > > > >>>>
> > > > >>
> > > >
> > > https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
> > > > >>>>
> > > > >>>> The full release notes are available in Jira:
> > > > >>>>
> > > > >>
> > > >
> > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
> > > > >>>>
> > > > >>>> We would like to thank all contributors of the Apache Flink
> > > community
> > > > >> who made this release possible!
> > > > >>>>
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Yun, Jing, Martijn and Lincoln
> > > > >>
> > > >
> > > >
> > >
> >
> >
> > --
> > Best,
> > Hangxiang.
--
Best,
Benchao Li
enix表进行scan查询,再根据scan查询结果按trans_date字段值进行过滤
>
>
> https://issues.apache.org/jira/browse/FLINK-34170
--
Best,
Benchao Li
wrote:
> >> 我的sql如下:
> >> 、
> >>
> >>
> >> t_purch_apply_sent_route 是通过flink cdc创建的
> >> t_purch_apply_sent_route_goods 是普通的jdbc
> >> 我期望的结果是返回符合过滤条件的;但现在执行的结果,会返回t_purch_apply_sent_route表所有数据
> >> 这显然不符合我的预期,原因应该是因为过滤条件进行了过早的下推
> >> 这应该算是bug吧,或者要满足我的预期,该怎么写sql?
> >>
> >>
> >>
> >>
--
Best,
Benchao Li
with existing
> Hive clients, such as Hive JDBC, Beeline, DBeaver, Apache Superset and so on.
> 这里有提到Beeline工具,难道不是 beeline> !connect jdbc:flink://localhost:8083 这样的连接方式了?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-10-30 11:27:1
. . . . . . .> 'format' = 'csv'
> . . . . . . . . . . . . . . . . . . . .> );
> Failed to create the executor.
> Connection is already closed.
>
--
Best,
Benchao Li
applications.
> > >>>>>>
> > >>>>>> The release is available for download at:
> > >>>>>> https://flink.apache.org/downloads.html
> > >>>>>>
> > >>>>>> Please check out the release blog post for an overview of the
> > >>>>> improvements
> > >>>>>> for this release:
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
> > >>>>>>
> > >>>>>> The full release notes are available in Jira:
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885
> > >>>>>>
> > >>>>>> We would like to thank all contributors of the Apache Flink
> > >> community
> > >>>> who
> > >>>>>> made this release possible!
> > >>>>>>
> > >>>>>> Best regards,
> > >>>>>> Konstantin, Qingsheng, Sergey, and Jing
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> >
--
Best,
Benchao Li
ata
> toString,BinaryRowData没有实现该方法QQAQ
>
> Benchao Li 于2021年4月9日周五 10:42写道:
>
> > GenericRowData和BinaryRowData都是RowData这个接口的具体实现。
> > 所以你只需要针对RowData进行编程即可,不能假设它使用哪个具体实现。
> >
> > 关于你的问题,在算子之间数据计算和转换的时候,会有很多地方构造出来BinaryRowData,
> > 比如典型的就是序列化的时候都会按照BinaryRowData来
格式,Flink SQL Client 没有直接支持。
>
> Flink SQL Client 有一个 Raw 格式选项,不过我们还没有找到如何使用这个的文档;
>
> 看看大家有没有相关的经验可以分享
>
> 多谢
>
--
Best,
Benchao Li
mini-batch对aggregate算子是有效的,开启了之后它的输出会降低一些,从而降低了sink的输出压力。
casel.chen 于2022年1月7日周五 07:42写道:
> mini-batch优化针对sink算子也有效吗?我是直接aggregate without window然后将聚合结果输出到sink算子。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-01-06 20:43:00,"Bench
>>
> >>
> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
> >>
> >>
> >>
> >> INSERT INTO mysql_sink_table
> >>
> >> SELECT category, dt, LAST_VALUE(total)
> >>
> >> OVER (
> >>
> >> PARTITION BY category
> >>
> >> ORDER BY PROCTIME()
> >>
> >> RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
> >>
> >> ) AS var1
> >>
> >> FROM (
> >>
> >> SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
> >>
> >> );
>
--
Best,
Benchao Li
ds_k], fields=[id,
> > name])
> > Sink: Sink(table=[default_catalog.default_database.ads_k2], fields=[id,
> > name]))
> >
> >
> > TableSourceScan 和 Sink相比多了个 中括号,并且采用 ',' 分割名字功空间,这是为什么
> >
>
--
Best,
Benchao Li
mat' = 'json'
> );
>
>
>
> SELECT
> user_id,
> item_id,
> behavior,
> next_bv
> FROM
> ( SELECT *, lag( behavior, 1 ) over ( PARTITION BY user_id ORDER
> BY proctime ) AS next_bv FROM KafkaTable ) t;
--
Best,
Benchao Li
这个问题已经在1.12中修复了,参考:
https://issues.apache.org/jira/browse/FLINK-18688
Benchao Li 于2021年8月30日周一 下午7:49写道:
> Hi xingxing,
>
> 看起来你可能也遇到了这个bug了。
> 我们遇到过一个bug是这样的,在group by的多个字段里面,如果有2个及以上变长字段的话,会导致底层的BinaryRow序列化
> 的结果不稳定,进而导致状态恢复会错误。
> 先解释下变长字段,这个指的是4byte存不下的数据类型,比如典型的varchar、li
; 'article_auto_video_play_click'))a
> groupby platform, type, `time`;
>
>
> 期待大家的帮助与回复,希望能给些问题排查的思路!
>
>
>
>
--
Best,
Benchao Li
MBLE_START(`event_time`, INTERVAL '1' SECOND) AS `log_time`,
> >> `dim`,
> >> count(1),
> >> FROM input_table
> >> GROUP BY TUMBLE(`event_time`, INTERVAL '1' SECOND),`dim`
> >>
> >>
> >>
> >> *Best Regards*
> >> *Jeremy Mei*
> >>
> >
> >
> > --
> >
> > *Best Regards*
> > *Jeremy Mei*
> >
>
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>
--
Best,
Benchao Li
time层把GenericRowData转换BinaryRowData的规则是什么?
>
--
Best,
Benchao Li
嗯,是这样的。
datayangl 于2021年3月19日周五 下午5:55写道:
> calcite解析将<> null 解析为unknown, 在flink优化阶段直接将unkown这个条件默认视为false,通过规则匹配
> 将整条sql优化为values(没有任何结果的sql),于是直接将程序的source task finish了。这个过程我理解的对吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
--
Best,
Benchao Li
> is
> not null 可以正常运行并一直产生数据。
>
> 疑问:我明白is not null是正确的用法,问题是用<> null 为什么程序会直接结束而且没有任何报错,感觉像是当作批处理去运行了。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li
Hi,
你可以理解为用的是MapState来保存的状态。
op <520075...@qq.com> 于2021年3月16日周二 下午3:00写道:
> 各位大佬好,想问下flinksql里的count (distinct)默认是用哪种state保存的状态
--
Best,
Benchao Li
ture 社区有规划了吗?
>
>
> Benchao Li 于2021年3月3日周三 上午10:23写道:
>
> > 当前的确是还没有表达式复用的优化,所以表达式最终都是会重复执行的。
> > 这个应该是未来要优化的一个点,我们内部也是刚刚做了这个feature。
> >
> > 这个没有复用不只是在SQL里面看到的多少次,就会执行多少次,而是在
> > plan的过程中会将表达式完全展开,比如下面的SQL:
> > ```SQL
> > SELECT my_map['
`dump_json_to_map`这个函数执行3次。
HunterXHunter <1356469...@qq.com> 于2021年3月3日周三 上午9:43写道:
> 为什么4次是没问题的,感觉只执行一次才是最优的
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
--
Best,
Benchao Li
> """.stripMargin)
> >
> > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到
> >
> > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
> > subtask的watermark。
> > ---
>
反压的话,你可以重点看下你使用的是什么state backend,
如果是filesystem,那状态就是放heap的,这种你需要重点看下gc相关的问题;
如果是rocksdb,这种状态是直接序列化到rocksdb中了,一般很少有内存问题,更多的是IO问题,或者CPU瓶颈。
你可以按照找个思路排查一下。
song wang 于2020年12月10日周四 上午11:38写道:
> hi,Benchao,
> 是的,任务失败时,右流出现了反压,已经连续两天出现这个问题了,我看下为啥会出现反压,感谢!
>
> Benchao Li 于2020年12月10日
ig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
> 各位大佬能否给些排查建议呢?
>
>
>
>
>
>
>
--
Best,
Benchao Li
2)此外,还有一个点,这个我也不确认。如果是datastream
> api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
>
> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。
>
> macia kk 于2020年12月9日周三 上午1:17写道:
>
> > @Benchao Li 感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
>
MySQL DB 所有的 Binlog 打进去了。我的
> > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order
> > > item
> > > > >信息,所以 我用:
> > > > >
> > > > > SELECT *
> > > > > FROM A
> > > > > LEFT OUT JOIN B
> > > > > ON order_id
> > > > > Where A.event_time > B.event_time + 30 s
> > > > > A.event_time > B.event_time - 30 s
> > > > >
> > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark
> > > > Structural
> > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题?
> > > >
> > >
> >
>
--
Best,
Benchao Li
使用的默认的blink),
> 但是总是报错函数的用法不对,有相关资料来介绍这些函数的使用方法的吗?或者示例
> >
> > 闫云鹏
> > DXM 支付业务部
> > 地址:北京市海淀区西北旺东路度小满金融总部
> > 邮编:100085
> > 手机:13693668213
> > 邮箱:
>
> > yanyunpeng@
>
> > mailto:
>
> > yanyunpeng@
>
> >
> >
> > 度小满金融
> >
> > 精于科技 值得信赖
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
--
Best,
Benchao Li
zilong xiao 于2020年11月24日周二 下午4:46写道:
> 这是从哪看出来的呢 求指点,另外如果想用DDL写的schema 应该怎么做呢?
>
> Benchao Li 于2020年11月24日周二 下午4:33写道:
>
> > 看起来这个format是用的自动推导schema,而不是用的DDL写的schema。
> >
> > zilong xiao 于2020年11月24日周二 下午4:13写道:
> >
> > > 用的Flink1.11 不过是用的别人写的format,估计是
看起来这个format是用的自动推导schema,而不是用的DDL写的schema。
zilong xiao 于2020年11月24日周二 下午4:13写道:
> 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧,
> https://github.com/yangyichao-mango/flink-protobuf
>
> Benchao Li 于2020年11月24日周二 下午3:43写道:
>
> > 看起来你的DDL写的没有什么问题。
> >
> > 你用的
看起来你的DDL写的没有什么问题。
你用的是哪个Flink版本呢?
此外就是可以发下更完整的异常栈么?
zilong xiao 于2020年11月24日周二 下午2:54写道:
> Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~
>
> Benchao Li 于2020年11月24日周二 下午2:49写道:
>
> > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
> >
> > zilong xiao 于2020年11月24日
你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
zilong xiao 于2020年11月24日周二 上午10:49写道:
> [image: image.png]
> 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
>
--
Best,
Benchao Li
//apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li
current SQL conformance
> level
>
>
> 看了下flink 的issue ,已经有人碰到过了,说是要1.12版本修复
>
>
>
>
> 想问下:如果再1.11版本,flink-sql 要怎么操作才能支持 % 运算呢? 可以通过修改配置文件来实现么?比如flink-conf.yaml
--
Best,
Benchao Li
现在的json format就是支持大小写区分的吧。可以提供下你的DDL和样例数据么?
王刚 于2020年10月21日周三 下午8:08写道:
> hi~ 各位大佬,
>
> 由于catalog是不支持大小写 如果kafka 的数据是json格式,且某些json的key区分大小写的,
>
> 这个时候建的在hive catalog里建的kafka表的某些json字段是取不到的
>
> flink 1.11有现成解决的方案可以解决这种问题不
>
--
Best,
Benchao Li
嗯,道理是一样的。ROW/MAP/ARRAY这些本来就是嵌套类型,嵌套深度没有限制
Roc Marshal 于2020年10月21日周三 下午2:38写道:
> 如果是深度是三层以上也是类似的嵌套语法吗?或者说是其他的写法?
>
>
> 谢谢
>
> Best Roc.
>
>
>
>
>
> 在 2020-09-24 20:53:12,"Benchao Li" 写道:
> >这个情况现在是支持的,可以用类似于这种写法:
> >```SQL
> >CRE
ming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:53)
> at
>
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:24)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 11 more
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li
s2.id where
> s1.proctime between s2.proctime - INTERVAL '5' SECOND and s2.proctime +
> INTERVAL '5' SECOND;
> >
> >
> >最后的 join 语句预期是 如果两个source的消息, 先后到达时间超过 10 秒,则输出,
> 两条消息。
> >
> >
> >目前的观察结果是,如果两条消息, 先后到达时间超过10 秒,输出为:,
> >为何超过 10 秒后,仍然会输出 ?
>
--
Best,
Benchao Li
发送时间: 2020年10月16日 6:47
> 收件人: user-zh@flink.apache.org
> 主题: Re: 回复:回复: flink 自定义udf注册后不能使用
>
> 是的,是我传参有问题
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li
可以具体描述下你的问题么,没太看懂你的问题。
smallwong 于2020年10月14日周三 下午6:57写道:
> 哈喽,请问是做了什么调整?才10秒的窗口,期待每秒都输出结果的
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
--
Best,
Benchao Li
是的,所以应该用createNullableConverter,而不是createConverter
史 正超 于2020年10月14日周三 上午10:45写道:
>
> 但是 方法上有这样的一个注释:Creates a runtime converter which assuming input object is
> not null.
> 代码这样写的前提是,不允许对象的值为null的。
> ____
> 发件人: Benchao Li
> 发送时间: 2020年10月14
p类型的value值为null,会报空指针异常的。
>
> 发件人: 奔跑的小飞袁
> 发送时间: 2020年10月14日 1:46
> 收件人: user-zh@flink.apache.org
> 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常
>
> other_para MAP
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li
> return avroObject -> {
> final Map map = (Map) avroObject;
> Map result = new HashMap<>();
> for (Map.Entry entry : map.entrySet()) {
> Object key =
> keyConverter.convert(entry.getKey());
> Object value =
> valueConverter.convert(entry.getValue());
> result.put(key, value);
> }
> return new GenericMapData(result);
> };
> }
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li
gt; "FETCH" ...
> > "FROM" ...
> > "INTERSECT" ...
> > "LIMIT" ...
> > "OFFSET" ...
> > "ORDER" ...
> > "MINUS" ...
> > "UNION" ...
> > "," ...
> >
> >
> >
> > Table result = tEnv.sqlQuery("select * from OrderA join OrderB on
> > OrderA.user=OrderB.user");
>
--
Best,
Benchao Li
chao的回复是的对的,
> 你用SQL client 时, 不需要datastream connector的jar包,直接用SQL connector 对应的jar包
> flink-*sql*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
>
>
> > 相关lib包:
> > flink-connector-kafka_2.12-1.10.2.jar
> > kafka-clients-0.11.0.3.jar
>
> 祝好
> Leonard
>
--
Best,
Benchao Li
并行度为2,甚至更大时;程序可以执行,但是我的kafka中没有看到有数据输出?请问是什么原因呢?
> 使用stream api时,我们可以给每个算子设置并行度,那sql api我们是否可以给每条sql设置并行度?
--
Best,
Benchao Li
ntln("time out ... ")
> }
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li
>
> 不太清楚为什么用了mini batch就没读取这个配置。
> 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
--
Best,
Benchao Li
d":"1", "speed":4.0,"speed_1":3.0,
> "speed_2":2.0}
> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0,
> "speed_2":3.0}
> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0,
> "speed_2":4.0}
>
> 实际得到的结果数据是:
> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0,
> "speed_2":1.0}
> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0,
> "speed_2":2.0}
> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0,
> "speed_2":3.0}
> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0,
> "speed_2":4.0}
> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0,
> "speed_2":5.0}
> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0,
> "speed_2":6.0}
>
> 想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li
l*-connector-kafka***.jar就行了,把你添加的其他jar包都删掉。
> >>
> >>
> >> > 相关lib包:
> >> > flink-connector-kafka_2.12-1.10.2.jar
> >> > kafka-clients-0.11.0.3.jar
> >>
> >> 祝好
> >> Leonard
> >
> >
>
--
Best,
Benchao Li
VAL '5' SECOND
> topic=heli01
>
> The following factories have been considered:
> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>
>
>
> hl9...@126.com
>
--
Best,
Benchao Li
bsTableEnv.executeSql("select * from a.b")
>
>
> 然后发现了以下现象:
>
>
> 从图中可以得知,在`DatabaseCalciteSchema` 中
> 我发现下面几个奇怪的点
>
>1. databaseName 是 ‘'default'
>2. getTable将 `a`作为参数传入,而不是b (a是库名,b是表名)
>
>
>
> 首先可以确定的是这个发生在validation阶段
>
> 其次我发现特意针对这块做了一次catch `TableNotExistException`的操作
>
> 请问这部分代码的用途和目的是?
>
>
>
>
--
Best,
Benchao Li
nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
> >gt;发送时间:amp;nbsp;2020年9月27日(星期天) 下午4:59
> >gt;收件人:amp;nbsp;"user-zh" amp;gt;;
> >gt;
> >gt;主题:amp;nbsp;Re:flink sql count问题
> >gt;
> >gt;
> >gt;
> >gt;最好把null 变成0,amp;nbsp; 你这样amp;nbsp;amp;nbsp;
> sum(if(name like '南京%',1 , 0))
> >gt;在 2020-09-27 16:53:56,"zya" 写道:
> >gt;amp;gt;请教各位:
> >gt;amp;gt;我有一个sql任务需要进行count,在count中有一个表达式,只想count符合条件的记录,
> >gt;amp;gt;之前在hive中是这么写的:count(if(name like '南京%',1 ,
> null)),但是flink sql中count不能为null,有什么别的方法能实现该功能吗?
> >gt;amp;gt;使用的是flink1.10.1 blink
> >gt;amp;gt;amp;amp;nbsp;
>
--
Best,
Benchao Li
变了。
> >
> >Michael Ran 于2020年9月27日周日 下午2:39写道:
> >
> >> 滑动窗口
> >> 在 2020-09-27 13:25:37,"赵一旦" 写道:
> >> >如题,不使用DatastreamAPI,使用FlinkSQL能否实现五分钟窗口,每10秒输出一次呢?
> >>
>
--
Best,
Benchao Li
基于1.11测试的。目前来看,json format的2个设置都设置好。然后event
> time部分使用COALESCE将null情况设置为event_time 0。这么做是最好的情况啦。
>
> Benchao Li 于2020年9月25日周五 下午2:08写道:
>
> > 你用的是哪个版本?1.11版本应该是改进过这块,不应该出现这个情况。
> >
> > 赵一旦 于2020年9月25日周五 上午11:02写道:
> >
> > > 而且按照string无法接受"a":a,
如果数据为 t: abc 则数据直接非法被忽略。
> > 如果数据为t: "abc",则t被转为null?
> > 当然eventtime本身还有个不可null的限制,我通过COALESCE解决了。
> >
> >
> > 想知道有没有什么规则,尽可能避免任务失败的。因为数据一旦有一点异常导致失败就会很麻烦。
> >
> > 比如那个忽略错误,实际是无法解决event time为null的情况的这种错误的。
> > 我是通过COALESCE解决的。
> >
>
--
Best,
Benchao Li
"a13":{
> "a21":1,
> "a22":1,
> "a23":"1"}
> }
>
>
> 比如像这样的格式,怎么将a2开头的字段进行映射呢?如果现有版本不支持这个特性的话,是否可以考虑对此功能进行支持?
>
>
> 谢谢
--
Best,
Benchao Li
> 的tm的内存大小,kafka的partition数目,也无法将作业的吞吐量压上去。
>
>
>
> -邮件原件-
> 发件人: Benchao Li [mailto:libenc...@apache.org]
> 发送时间: 2020年9月18日 星期五 18:49
> 收件人: user-zh
> 主题: Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以
>
> 提交两个作业的话,两个作业是完全独立的,都会消费全量数据。
>
> 一个
@qq.com 于2020年9月23日周三 下午4:24写道:
>
> hi各位,有个问题请教一下:
> 我现在使用flink sql统计一下kafka中在某个时间窗口内指定字段出现的次数,使用event
> time,需要在5s内输出结果,但是数据会有一些延迟,可能大于5s,目前设置waterwark为
> WATERMARK FOR ts AS tsnbsp; - INTERVAL '5' SECODND
> ,但是这样延迟大于5s的数据就会被丢弃掉,请问下其他延迟的数据有没有什么办法进行处理?我看datastream
> api里面可以使用allowed
> lateness,但是这部分在sql中没看到有相关语法
>
>
> Flink版本1.10.1
> nbsp;
>
>
>
> --
>
> Best,
> Benchao Li
--
Best,
Benchao Li
+1
nashcen <2415370...@qq.com> 于2020年9月24日周四 下午1:09写道:
> +1
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li
相关语法
>
>
> Flink版本1.10.1
>
--
Best,
Benchao Li
.print(message)
> 先再pb 转成json 再套用 JsonRowDataDeserializationSchema处理json,
> 发现处理的性能就只能达到20w左右的tps,而如果是处理json格式的数据,tps是可以达到50-60w的tps.
> 想问一下,1、flink要是处理pb格式的数据,有什么好的办法? 2
> 、社区对pb format 会支持么?
> 3、pb转json 有什么性能比较好的工具包
--
Best,
Benchao Li
是4G。版本1.11.1
> ------
> 发件人:Benchao Li
> 发送时间:2020年9月23日(星期三) 10:50
> 收件人:user-zh
> 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
>
> Hi Tianwang,
>
> 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
>
> 1. time interval join会将watermark delay之后再发送,也
intGCDateStamps -XX:+UseGCLogFileRotation
> -XX:NumberOfGCLogFiles=10
> > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> >
>
>
>
> --
> **
> tivanli
> **
>
--
Best,
Benchao Li
这个性能影响指的是跟那种情况进行对比呢?
smq <374060...@qq.com> 于2020年9月21日周一 下午6:49写道:
> 谢谢,多问一句,并行度为1的话,keyby算子加上keydstate对性能影响大吗
>
>
>
> ---原始邮件---
> 发件人: "Benchao Li" 发送时间: 2020年9月21日(周一) 下午4:39
> 收件人: "user-zh" 主题: Re: 消费kafka source反压
>
>
> 这种反压一般是下游反压过
可以通过SQL的where条件来过滤吧
chuyuan 于2020年9月21日周一 下午6:48写道:
> 好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
--
Best,
Benchao Li
uot; dt string" +
> " ) stored as orcfile " +
> " TBLPROPERTIES" +
> " (" +
>
> "'partition.time-extractor.kind'='custom'," +
>
> "'partition.time-extractor.timestamp-pattern'='$dt'," +
>
>
> "'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor',"
> +
>
> "'sink.partition-commit.trigger'='partition-time'," +
>
> "'sink.partition-commit.delay'='0s'," +
>
> "'sink.partition-commit.policy.kind'='metastore'" +
> ")");
>
>
> 第三步,把临时表的数据insert into到目标表,此时出现异常:
> org.apache.flink.table.api.TableException: A raw type backed by type
> information has no serializable string representation. It needs to be
> resolved into a proper raw type.
>
> 然后打印临时表的数据结构,发现lib和properties在临时表中数据结构被解析为:
> |-- lib: LEGACY('RAW', 'ANY')
> |-- properties: LEGACY('RAW', 'ANY')
> |-- track_id: BIGINT
> |-- type: STRING
> ,这说明lib LEGACY('RAW', 'ANY')无法匹配hive目标表中lib
> MAP数据结构,写入失败,大概流程是这样。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li
这种反压一般是下游反压过来的,可以检查下最后一个反压的算子,那个才是处理能力的瓶颈。
smq <374060...@qq.com> 于2020年9月21日周一 下午2:08写道:
>
> 大家好,在测试flink消费速率时,发现数据处理比较慢,大概一个task每秒处理1000条左右,经过查看UI界面,发现读取kafka数据源这块source反压达到1,请问有这方面经验吗?
--
Best,
Benchao Li
日周一 上午10:21写道:
> > >
> > >> 可以使用calcite。解析kind为CREATE_TABLE的语句,解析INSERT,下推from的表。
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> 在 2020-09-21 10:12:13,"Harold.Miao" 写道:
> > >> >hi all
> > >> >
> > >> >请教大家在复杂sql语句中parse所有的table name是怎么实现的。
> > >> >
> > >> >谢谢
> > >> >
> > >> >--
> > >> >
> > >> >Best Regards,
> > >> >Harold Miao
> > >>
> > >
> > >
> > >--
> > >
> > >Best Regards,
> > >Harold Miao
> >
>
>
> --
>
> Best Regards,
> Harold Miao
>
--
Best,
Benchao Li
presentation. It needs to be
> resolved into a proper raw type.
> 方便说下具体实现细节吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li
倍的。
>
> 我想达到的目的不是通过多向yarn集群提交多一个app,而是通过设置并行度来提高应用的吞吐量。。
>
> 求各位大佬指导
>
--
Best,
Benchao Li
ksql 滑动窗口数据进不来的问题
>
>
>
> 可以再详细一点描述下问题么,滑动窗口数据进不来,指的是窗口没有触发计算还是数据就没有到窗口呢?
>
> 如果只是窗口没有触发计算,一般用了row time的话,可以排查下watermark是否有正常生成。
>
> 李杨烨 <438106...@qq.com 于2020年9月14日周一 下午1:32写道:
>
> 刚刚邮件图片挂了,上传了新的图片地址:
> http://chuantu.xyz/t6/741/1600061331x-1224481926.jpg
> 使用rowTime做的滑动
>
>
>
> --
>
> Best,
> Benchao Li
--
Best,
Benchao Li
可以再详细一点描述下问题么,滑动窗口数据进不来,指的是窗口没有触发计算还是数据就没有到窗口呢?
如果只是窗口没有触发计算,一般用了row time的话,可以排查下watermark是否有正常生成。
李杨烨 <438106...@qq.com> 于2020年9月14日周一 下午1:32写道:
> 刚刚邮件图片挂了,上传了新的图片地址:http://chuantu.xyz/t6/741/1600061331x-1224481926.jpg
> 使用rowTime做的滑动
--
Best,
Benchao Li
你好,你的图片挂了,可以把图片放到第三方图床工具然后把链接发出来。或者直接用文本描述的问题。
李杨烨 <438106...@qq.com> 于2020年9月14日周一 上午11:25写道:
>
> 根据rowTime做的滑动
--
Best,
Benchao Li
gt; 'com.binance.risk.flink.udf.AggDistinctDetail';
>
> select
> realIp ,
> UDF_InfoDistinctMerge(userId) over w1 as userSet
> from source_table
> window w1 as (partition by realIp order by requestDateTime asc RANGE
> BETWEEN
> INTERVAL '24' hour preceding AND CURRENT ROW) ;
>
> 实际测试下来,发现聚集后的字符串userSet是一直在增长,即使窗口时间已经超过24小时,依然被聚集到userSet这个结果中,这和预期不符。
>
> 问题:
> 是上面UDAF的实现有啥问题么?还是UDAF在over窗口上有bug?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li
sql 算子内部会自动处理这些状态。 这个状态只是聚合的中间结果,并不需要保留原始数据。
当然这个聚合的中间结果状态,也可以指定state retention time来清理一些过期的状态。
last_value只是一个聚合函数,没啥特殊的地方,而且只是按照处理时间获取最后一条数据的聚合函数。
lec ssmi 于2020年9月10日周四 下午2:35写道:
> 上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗?
> 感觉底层和 last_value() group by id是一样的。
>
> Benchao
金额。那么总金额就是错的。nbsp;nbsp;
> 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。
>
>
> nbsp; 刚入坑实时处理,请大神赐教
>
>
>
> --
>
> Best,
> Benchao Li
--
Best,
Benchao Li
GMV实时统计为1000.
> 然后下午15点,009订单金额被修改为500。数据生成json也会进入kafka.
> 这时如果不减去上午已经统计的金额。那么总金额就是错的。
> 请问是不是根据 update /delete 要写这个减去的逻辑。按日去重是不行了,因为是增量处理不能,上午的数据已经被处理了不能再获取了。
>
>
> 刚入坑实时处理,请大神赐教
--
Best,
Benchao Li
eam>,再转换flatMap转换成DataStream,再使用tableEnv.fromDataStream把它变成tableSource?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li
log,当发生表结构变更时,希望能够实时的变动flink内部表的schema,但是目前来看,表的schema都是create
> table时写死的,有什么办法可以处理这种场景呢
--
Best,
Benchao Li
020年9月2日(星期三) 下午3:20
> 收件人:"user-zh"
> 主题:请指教一个关于时间窗的问题,非常感谢!
>
>
>
> 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
>
> //指定eventtime字段及生成watermark
> DataStream withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
> WatermarkStrategy
>
> . //. .withIdleness(Duration.ofSeconds(10)) //即时没数据时,也生成watermark
> .withTimestampAssigner((event, timestamp)-event.f3));
>
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> tenv.registerDataStream(
> "log",
> withTimestampsAndWatermarksDS,
> "appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
>
> String sql = "select appid,eventid,cnt," +
>
> "(starttime + interval '8' hour ) as stime," +
>
> "(endtime + interval '8' hour ) as etime "
> +
> "from
> (select appid,eventid,count(*) as cnt," +
>
> "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," +
>
> "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " +
> "from
> log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME
> '00:00:00')"; //希望整点结束时触发时间窗关闭
>
> Table table = tenv.sqlQuery(sql);
> DataStream Result.class);
>
> 输出的结果是:
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。
> (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39
> 等到这条数据上来后才触发
> ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01
> 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481
> //2020/9/2 15:23:35}
> 请问一下哪里出了问题?万分感谢!
>
--
Best,
Benchao Li
nslated to row)
> > > > > >else if (items.isArray()) {
> > > > > > final TypeInformation[] types = convertTypes(location +
> > '/'
> > > +
> > > > > > ITEMS, items, root);
> > > > > >
> > > > > > // validate that array does not contain additional items
> > > > > > if (node.has(ADDITIONAL_ITEMS) &&
> > > > > > node.get(ADDITIONAL_ITEMS).isBoolean() &&
> > > > > > node.get(ADDITIONAL_ITEMS).asBoolean()) {
> > > > > > throw new IllegalArgumentException(
> > > > > > "An array tuple must not allow additional items in
> > node:
> > > "
> > > > > > + location);
> > > > > > }
> > > > > >
> > > > > > return Types.ROW(types);
> > > > > >}
> > > > > >throw new IllegalArgumentException(
> > > > > > "Invalid type for '" + ITEMS + "' property in node: " +
> > > > location);
> > > > > > }
> > > > > >
> > > > >
> > > >
> > >
> >
>
--
Best,
Benchao Li
个此功能不确定指的是这两个时间函数不能用吗
>
--
Best,
Benchao Li
案一不能保证数据量一定可以全部加载到内存,方案二又需要额外的外部存储,增加了系统结构的复杂度
>
> 请问各位有什么更好的建议嘛?感谢
>
> 原始邮件
> 发件人: Leonard Xu
> 收件人: Jark Wu
> 抄送: user-zh; Benchao Li
> 发送时间: 2020年8月27日(周四) 20:11
> 主题: Re: Flink 维表延迟join
>
>
> 多谢 Jark 提议
>
> Issue[1] 建好了, 大家可以在issue下讨论。
>
>
;>>>> Hi all,
> > > > > >>>>>
> > > > > >>>>> On behalf of the Flink PMC, I'm happy to announce that
> Dian Fu
> > is
> > > > now
> > > > > >>>>> part of the Apache Flink Project Management Committee
> (PMC).
> > > > > >>>>>
> > > > > >>>>> Dian Fu has been very active on PyFlink component,
> working on
> > > > various
> > > > > >>>>> important features, such as the Python UDF and Pandas
> > > integration,
> > > > > and
> > > > > >>>>> keeps checking and voting for our releases, and also has
> > > > successfully
> > > > > >>>>> produced two releases(1.9.3&1.11.1) as RM, currently
> working as
> > > RM
> > > > > to push
> > > > > >>>>> forward the release of Flink 1.12.
> > > > > >>>>>
> > > > > >>>>> Please join me in congratulating Dian Fu for becoming a
> Flink
> > PMC
> > > > > >>>>> Member!
> > > > > >>>>>
> > > > > >>>>> Best,
> > > > > >>>>> Jincheng(on behalf of the Flink PMC)
> > > > > >>>>>
> > > > > >>>>
> > > > > >>
> > > > > >> --
> > > > > >> Best regards!
> > > > > >> Rui Li
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>
>
--
Best,
Benchao Li
。
>
> Thanks
--
Best,
Benchao Li
sink
> > > 不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto
> commit
> > > offset 看起来似乎没有什么区别
> > >
> > > 可否具体解释一下? 谢谢!
> > >
> > > Eleanore
> > >
> > > On Tue, Aug 25, 2020 at 9:59 PM Benchao Li
> wrote:
这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。
范超 于2020年8月26日周三 上午11:38写道:
> 大家好,我现在有个疑问
> 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
>
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
>
>
> 多谢大家了
>
> 范超
>
--
Best,
Benchao Li
还是这个问题,如果字段的值有时候是json有时候是json数组,那么我只想把它当做字符串显示,该怎么写?
>
>
>
>
> 在2020年08月25日 14:05,酷酷的浑蛋 写道:
> 我知道了
>
>
>
>
> 在2020年08月25日 13:58,酷酷的浑蛋 写道:
>
>
>
>
> flink1.11
>
> 读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理?
>
>
--
Best,
Benchao Li
你指的是用json format么?这个是json format的问题,当前的确是会把所有字段都写进去,不管是否为null。
我们内部也有类似需求,我们是修改了json format,可以允许忽略null的列。
于2020年8月23日周日 下午6:11写道:
> 在flink1.11中使用table sql写入时,有一些字段为空时,依然被写入到elasticsearch,这个方式应该不太恰当。
>
>
>
--
Best,
Benchao Li
; +
> >" c_val bigint, " +
> >" wStart TIMESTAMP(3) " +
> >") WITH ('connector' = 'print') ";
> > bsTableEnv.executeSql(sinkDDL);
> >
> > System.out.println(bsTableEnv.from("print_table").getSchema());
> >
> > Table table = bsTableEnv.sqlQuery("select f_random, count(f_random_str),
> TUMBLE_START(ts, INTERVAL '5' second) as wStart from datagen group by
> TUMBLE(ts, INTERVAL '5' second), f_random");
> > bsTableEnv.executeSql("insert into print_table select * from " + table);
>
>
>
--
Best,
Benchao Li
有人帮忙解答下吗,此外补充个相关问题,除了并行度,flink-sql情况下,能做检查点/保存点,并基于检查点/保存点重启sql任务吗。
> >>> >
> >>> > 发件人: "Zhao,Yi(SEC)"
> >>> > 日期: 2020年8月13日 星期四 上午11:44
> >>> > 收件人: "user-zh@flink.apache.org"
> >>> > 主题: 如何设置FlinkSQL并行度
> >>> >
> >>> > 看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。
> >>> > 如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢?
> >>> >
> >>> > 比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。
> >>> >
> >>> >
> >>>
> >
> >
>
--
Best,
Benchao Li
quot;insert into result_stuff select stuff_id,
> stuff_base_id, stuff_name from gen_stuff");
> }
> }
> ```
>
> 然而,mysql 每秒大约只多 1 条数据。如果按一条数据 20B 来算,写入速度是 200KB/s,这无法满足我的需求。。。
>
> 请问,是我哪里的配置有问题,还是有其它更好的写入数据库的方案,谢谢给出任何建议的人。
>
> 我使用的和 jdbc 有关的依赖如下:
>
> ```xml
>
> org.apache.flink
>
> flink-connector-jdbc_${scala.binary.version}
> ${flink.version}
>
>
> mysql
> mysql-connector-java
> 8.0.21
>
> ```
>
> (作为对比,在我的电脑上使用 datagen 生成数据,写入文件系统 sinker 的效率大约是 23MB/s)
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
--
Best,
Benchao Li
流后,每个子流应该是只有自己的state,但从算子实例考虑,好像只有1个state实例存在。
>
> Benchao Li 于2020年8月20日周四 下午4:40写道:
>
> > Hi,
> >
> > 问题1&2 都不存在多线程的问题。Flink底层来保证这些方法都是在同一个线程串行执行的。
> >
> > shizk233 于2020年8月20日周四 下午2:22写道:
> >
> > > Hi all,
> > >
> > > 请教一下,
例,其中的MapState应该也是一个实例,那么不同key下的
> Process过程是并发执行的还是顺序执行的,会竞争MapState资源吗?
>
--
Best,
Benchao Li
2.底下的 records received等几个列都是0。怎么样才会统计?
>
>
>
--
Best,
Benchao Li
> 收件人:user-zh
> 主 题:Re: flink 1.11 order by rowtime报错
>
> 错误呢?没看到。把代码贴出来看一下,是不是processtime没有设置或者设置不对
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
--
Best,
Benchao Li
FROM
> hive_user_parquet
> GROUP BY
> id
> /*+ OPTIONS('streaming-source.enable'='true',
> 'streaming-source.consume-start-offset'='2020-08-18') */
> 通过分组统计好像是会报语法错误的,这是什么原因造成的呢
>
>
>
> 18579099...@163.com
>
--
Best,
Benchao Li
r.zookeeper.quorum' = ‘IP:port',
> 'connector.zookeeper.znode.parent' = '/hbase',
> );
>
>
> CREATE TABLE print_table (
> f0 STRING,
> f1 INT,
> f2 BIGINT,
> f3 BIGINT
> ) WITH (
> 'connector' = 'print'
> );
>
>
> insert into print_table
> select rowKey, cf.age, cf.area, tas
> from dimension
--
Best,
Benchao Li
共有 359 项搜索結果,以下是第 1 - 100 matches
Mail list logo