Re:Re:Re:Re:flink sql connector options如何支持Map数据类型?

2022-12-28 文章 casel.chen
> >所以要看用户想要什么,你们想给用户开放到哪个程度? >至于是不是可以像flink sql kafka connector定义 `properties.*` >,这个是具体实现的方式,现在都不清楚你要做什么,先确定目标,再考虑实现。 > > >Thanks > >在 2022-12-27 13:24:38,"casel.chen" 写道: >> >> >>遇到用户添加自定义请求头Headers问题 >> >>如果自定义Headers是和内容相关

Re:Re:flink sql connector options如何支持Map数据类型?

2022-12-26 文章 casel.chen
nnector options中支持Map数据类型呢? >options里面都是k=v的结构,v都是字符串,所以你要考虑的是如何用 字符串 描述 Map > > > > >Thanks > >在 2022-12-17 10:20:29,"casel.chen" 写道: >>我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector >>options中支持Map数据类型呢?

Re:Re: flink sql connector options如何支持Map数据类型?

2022-12-19 文章 casel.chen
看过了,不支持http source table,而且即使http lookup table也不支持map数据类型 在 2022-12-19 14:51:42,"Weihua Hu" 写道: >Hi, 你可以尝试使用独立开源的 http connector > >https://github.com/getindata/flink-http-connector > >Best, >Weihua > > >On Sat, Dec 17, 2022 at 10:21 AM ca

flink sql connector options如何支持Map数据类型?

2022-12-16 文章 casel.chen
我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector options中支持Map数据类型呢?

flink 1.16 lookup join重试策略问题

2022-12-07 文章 casel.chen
我们有场景会发生维表数据后于事实流表数据到达,使用flink 1.16 lookup join重试策略时,如果超过重试次数还没关联上会发生什么?待关联字段值都为null么?

Re:Re: 如何扩展flink sql以实现延迟调用?

2022-12-07 文章 casel.chen
joins >) >另外如果可以使用 lookup join 单边驱动关联并且不是所有数据都需要等待的话,可以尝试 lookup join 的延迟重试 >https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/hints/#3-enable-delayed-retry-strategy-for-lookup > >Best, >Lincoln Lee > > >casel.

Re:Re: 如何扩展flink sql以实现延迟调用?

2022-12-07 文章 casel.chen
le-delayed-retry-strategy-for-lookup > >Best, >Lincoln Lee > > >casel.chen 于2022年12月7日周三 11:52写道: > >> 有人能够解答一下吗? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2022-11-26 11:20:34,"casel.chen" 写道: >> >双流关联场景下,流A数据到达后并不马上和流B进行关联,而是经过设置一段时间后再关联,这种场景下如何用flink >> sql实现?如果当前不支持,需要怎样扩展flink sql呢? >>

flink on native k8s模式调度能否根据节点磁盘和网络io指标进行调度?

2022-12-06 文章 casel.chen
flink on native k8s模式调度能否根据节点磁盘和网络io指标进行调度? 貌似现在只能根据cpu/内存剩余量进行调度,但如果新加一个节点会导致新作业全部的pod都部署到该节点上,造成该节点网络或磁盘IO飙升,这种情况有什么好的对策么?

Re:如何扩展flink sql以实现延迟调用?

2022-12-06 文章 casel.chen
有人能够解答一下吗? 在 2022-11-26 11:20:34,"casel.chen" 写道: >双流关联场景下,流A数据到达后并不马上和流B进行关联,而是经过设置一段时间后再关联,这种场景下如何用flink >sql实现?如果当前不支持,需要怎样扩展flink sql呢?

flink sql是否支持延迟lookup join?

2022-12-06 文章 casel.chen
维表流数据晚于主表流数据到达甚至可能到达不了,所以想设置个5分钟等待窗口,关联上正常处理,关联不上发到另一个kafka topic,这种场景使用flink sql要如何实现?

Re:flink on k8s节点网络io飙高问题如何解决?

2022-12-06 文章 casel.chen
flink on native kubernetes如何使用 affinity 配置软互斥?即同一个作业的不同pod分布在不同的节点node上 在 2022-12-05 19:51:02,"casel.chen" 写道: >我司flink作业运行在k8s集群上,日前发现有一些k8s集群节点的网络io在某些时间段超过了告警阈值180MB/s,最多达到430MB/s,最少的只有4MB/s,导致新作业无法部署到网络负载高的节点上,哪怕cpu和内存还有很多剩余。 >目前我想的办法是利用节点亲和性手动从负载高的节点上迁移出那些耗网络

flink on k8s节点网络io飙高问题如何解决?

2022-12-05 文章 casel.chen
我司flink作业运行在k8s集群上,日前发现有一些k8s集群节点的网络io在某些时间段超过了告警阈值180MB/s,最多达到430MB/s,最少的只有4MB/s,导致新作业无法部署到网络负载高的节点上,哪怕cpu和内存还有很多剩余。 目前我想的办法是利用节点亲和性手动从负载高的节点上迁移出那些耗网络io高的作业pod到负载低的节点,但是过一段时间又会出现类似的问题,请问: 1. 有什么办法可以彻底消除这种网络负载不均衡问题么? 2. k8s能否根据pod网络io负载进行合理调度吗?

Re:回复:flink sql消费upsert-kafka源表如何指定从哪个位点开始消费?

2022-12-05 文章 casel.chen
好吧,难怪我没找到设置开始消费位置的参数,谢谢! 在 2022-12-05 18:34:49,"JasonLee" <17610775...@163.com> 写道: >hi > > >Upsert-kafka 不支持指定消费者位置,默认是从 earliest 位置开始消费的,你可以自己修改代码支持 scan.startup.mode 参数。 > > >Best >JasonLee > > > 回复的原邮件 >| 发件人 | cas

flink sql消费upsert-kafka源表如何指定从哪个位点开始消费?

2022-12-05 文章 casel.chen
flink sql消费upsert-kafka源表如何指定从哪个位点开始消费?仿照kafka source表添加了 scan.startup.mode 参数会报非法参数

Re:回复: Re: 怎样从flink执行计划json生成StreamGraph?

2022-12-01 文章 casel.chen
gt;发送时间: 2022年11月30日 10:12 >收件人: user-zh@flink.apache.org >主题: Re: Re: 怎样从flink执行计划json生成StreamGraph? > >好吧,sql我具体不了解,我用的stream api比较多,我了解是stream >api到streamGraph,然后到jobGraph,然后就是直接rest api方式提交给集群执行。 standalone场景。 > >casel.chen 于2022年11月30日周三 00:16写道: >> &

Re:Re: flink sql作业无缝升级问题

2022-12-01 文章 casel.chen
之间可以做一些优化来缩短恢复时间。比如,把新作业先启动起来,申请好资源,同时停掉老作业,将做好的savepoint用来触发新作业的执行。 > >casel.chen 于2022年11月29日周二 08:38写道: > >> 线上有一个流量较大的flink sql作业需要升级添加业务字段,当前主要是kafka (canal) 多表关联写入 mongodb >> 数据同步场景,除了source offset外无其他状态,如何让用户对升级无感呢? >> 常规的停止作业再启动作业至少要几十秒,会造成消息积压告警,有没有可能先启新作业待运行平稳后再停

Re:Re: 怎样从flink执行计划json生成StreamGraph?

2022-11-29 文章 casel.chen
如果要支持调整flink sql作业每个算子资源和并行度的话,不是要从json转回streamGraph再提交吗? 在 2022-11-29 10:07:40,"yidan zhao" 写道: >并不需要从执行计划json生成streamGraph呀~ >streamGraph提交之前直接转jobGraph。 > >casel.chen 于2022年11月28日周一 08:53写道: >> >> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教

flink sql有办法获取到rowkind元数据字段吗?

2022-11-28 文章 casel.chen
flink sql有办法获取到rowkind元数据字段吗?比如按rowkind进行case when处理或者过滤

flink sql作业无缝升级问题

2022-11-28 文章 casel.chen
线上有一个流量较大的flink sql作业需要升级添加业务字段,当前主要是kafka (canal) 多表关联写入 mongodb 数据同步场景,除了source offset外无其他状态,如何让用户对升级无感呢? 常规的停止作业再启动作业至少要几十秒,会造成消息积压告警,有没有可能先启新作业待运行平稳后再停止老作业?kafka group使用同一个,作业启动从group-offsets开始可以吗?另外,如果是有大状态作业又要如何无缝升级?

flink sql接cdc数据源按最新数据统计问题

2022-11-28 文章 casel.chen
业务需求是mysql订单表按天按供应商实时统计交易金额,订单表会发生修改和删除,用flink sql要如何实现呢?开窗取最新一条记录再聚合吗?如果遇到delete记录会不会减去相应的price呢?试着写了如下flink sql不知道对不对 select s.biddate, s.supplier, sum(s.price) from ( select * from ( select biddate, supplier, price,

怎样从flink执行计划json生成StreamGraph?

2022-11-27 文章 casel.chen
源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教

如何扩展flink sql以实现延迟调用?

2022-11-25 文章 casel.chen
双流关联场景下,流A数据到达后并不马上和流B进行关联,而是经过设置一段时间后再关联,这种场景下如何用flink sql实现?如果当前不支持,需要怎样扩展flink sql呢?

如何使用flink sql优雅的处理大量嵌套if-else逻辑

2022-11-23 文章 casel.chen
我有一个flink sql作业需要根据不同字段值满足不同条件来设置另一个字段值,还有一些嵌套if-else逻辑,这块逻辑不是固定的,业务方会过一段时间调整一次。 想请问如何使用flink sql优雅的处理嵌套if-else逻辑呢?我有想到使用drools规则引擎,通过udf来调用,不知道还有没有更好的办法?

Re:Re: flink作业提交运行后如何监听作业状态发生变化?

2022-11-23 文章 casel.chen
t;JasonLee <17610775...@163.com> 于2022年11月24日周四 09:59写道: > >> Hi >> >> >> 可以通过 Flink 的 Metric 和 Yarn 的 Api 去获取任务的状态(任务提交到 yarn 的话) >> >> >> Best >> JasonLee >> >> >> 回复的原邮件 >> | 发件人 | casel.chen | >> | 发送日

Re:Re:flink作业提交运行后如何监听作业状态发生变化?

2022-11-23 文章 casel.chen
es.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/ > > > >Thanks > > > > >在 2022-11-23 08:32:11,"casel.chen" 写道: >>请问flink作业提交运行后如何监听作业状态发生变化以便在控台上实时显示作业状态变更?目前我们的做法是轮询,但效率低,有没有listener可以进行注册的方法呢?

Re:Re: 如何扩展flink sql以支持CTAS/CDAS语句?

2022-11-23 文章 casel.chen
是的,类似阿里云和腾讯云上面的功能 在 2022-11-23 10:02:09,"Shengkai Fang" 写道: >想问一下你想实现的功能是咋样的呢?是阿里云上的那种吗? > >Best, >Shengkai > >casel.chen 于2022年11月23日周三 08:29写道: > >> flink sql社区版貌似还不支持CTAS/CDAS语句,请问要如何扩展flink >> sql以支持CTAS/CDAS语句?可以给个思路或代码示例吗?谢谢!

flink作业提交运行后如何监听作业状态发生变化?

2022-11-22 文章 casel.chen
请问flink作业提交运行后如何监听作业状态发生变化以便在控台上实时显示作业状态变更?目前我们的做法是轮询,但效率低,有没有listener可以进行注册的方法呢?

如何扩展flink sql以支持CTAS/CDAS语句?

2022-11-22 文章 casel.chen
flink sql社区版貌似还不支持CTAS/CDAS语句,请问要如何扩展flink sql以支持CTAS/CDAS语句?可以给个思路或代码示例吗?谢谢!

如何扩展flink sql以支持CTAS/CDAS语句?

2022-11-22 文章 casel.chen
flink sql社区版貌似还不支持CTAS/CDAS语句,请问要如何扩展flink sql以支持CTAS/CDAS语句?可以给个思路或代码示例吗?谢谢!

如何正确扩展jdbc connector以支持更多的数据库方言?

2022-11-22 文章 casel.chen
如何正确扩展jdbc connector以支持更多的数据库方言?我们目前的做法是拉下flink源码直接进行修改添加方言支持,有没有更优雅的方式来实现呢?

Re:Re: Flink CDC2.2.1 设置server id范围

2022-11-14 文章 casel.chen
22年10月31日 下午4:57,林影 写道: >> > >> > Hi, Leonard. >> > >> > 我也有类似的疑惑。 >> > >> > 有个线上的Flink Application之前配置的serverid 是 >> > 6416-6418,并行度之前是3,后来缩容的时候并行度改成2了,在这种场景下serverid的范围需要进行调整吗? >> >> 缩容并不需要的,你的case里只会用6416 和 6417这两个id

flink sql作业动态设置告警规则问题

2022-11-07 文章 casel.chen
配置了prometheus收集flink sql作业指标,现在想根据这些指标动态设置一些告警规则,请问要如何实现? 查了下prometheus告警需要配置alert rule之后重启才生效,有没有办法不重启呢?常规实现方案是什么?

Remote system has been silent for too long. (more than 48.0 hours)

2022-11-01 文章 casel.chen
今天线上 Flink 1.13.2 作业遇到如下报错,请问是何原因,要如何解决? 作业内容是从kafka topic消费canal json数据写到另一个mysql库表 2022-09-17 19:40:03,088 ERROR akka.remote.Remoting [] - Association to [akka.tcp://flink-metrics@172.19.193.15:34101] with UID [-633015504] irrecoverably failed.

flink sql client取消sql-clients-default.yaml后那些预置catalogs建议在哪里定义呢?

2022-10-31 文章 casel.chen
flink新版本已经找不到sql-clients-default.yaml文件了,那么之前配置的那些预置catalogs建议在哪里定义呢?通过初始化sql么?

Re:Re: Flink CDC2.2.1 设置server id范围

2022-10-31 文章 casel.chen
server-id配置范围对于后面修改并发度是不是不太友好?每改一次并发度就得重新调整server-id范围么?还是说先配置一个较大的server-id范围,在在这个较大的范围内调整并发度? 在 2022-10-31 16:04:32,"Leonard Xu" 写道: >Hi, > >你5张表对应的source并发是多少呀?如果是多并发需要把server-id设置成一个范围,范围和并发数匹配,比如4个并发,应该类似’1101-1104’. >另外 server-id 是全局唯一的,你需要确保下你使用的server-id

flink web ui cancel job时能否指定要不要生成savepoint?

2022-10-27 文章 casel.chen
flink web ui cancel job时能否指定要不要生成savepoint? 目前是直接cancel job without savepoint的。

flink cdc什么时候支持flink 1.15.x?

2022-10-10 文章 casel.chen
当前flinlk cdc master分支的snapshot版本最高支持到flink 1.14.4,尝试使用flink 1.15.2编译会出错,请问flink cdc什么时候支持flink 1.15.x?

Re:Re: flink cdc能否同步DDL语句?

2022-10-10 文章 casel.chen
DL语句? > >目前,社区的 Flink CDC,是可以读取到 binlog 中的 DDL 语句的,但是不会传递给下游,即下游无法感知 schema 的变化。 > >Xuyang 于2022年10月10日周一 16:46写道: > >> Hi, 目前应该是不行的 >> 在 2022-09-26 23:27:05,"casel.chen" 写道: >> >flink cdc能否同步DDL语句以实现schema同步? 例如create table , create index, truncate >> table等 >>

flink sql cdc 2.2.1消费mysql binlog异常

2022-10-09 文章 casel.chen
flink sql cdc 2.2.1消费mysql binlog遇到如下异常,有谁遇到过?发现作业自己做了重试后过去了,想知道异常的root cause是什么?手动重起了作业重新消费后还是会出现。 Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at

如何实现flink作业失败实时通知告警?

2022-09-29 文章 casel.chen
当flink作业失败时如何第一时间发通知告警到相关方?现有方式 方式一:flink作业本身提供的rest api需要client不断去请求,不是实时不说还浪费资源,而且受网络抖动影响有时候还会超时获取不到,但不代表作业有问题。 方式二:通过作业暴露指标给promemtheus,因为prometheus是周期性(10s~20s) 来pull指标的,所以也达不到实时性要求。 flink作业能否在failure之前调用某个hook去通知相关方呢?如果要自己改的话,是要动哪个类呢?谢谢!

依赖flink cdc如何达到kafka connect with schema registry效果?

2022-09-29 文章 casel.chen
kafka connect with schema registry运行的时候会将表的schema信息注册到schema registry,同时消息以avro格式发到kafka topic 请问flink cdc要如何实现达到上述一样的效果?因为接下来我想依赖以下hudi博文提到的debezium入湖工具完成数据入湖 https://hudi.apache.org/blog/2022/01/14/change-data-capture-with-debezium-and-apache-hudi/

控制流方式能否改变作业ExecutionGraph?

2022-09-26 文章 casel.chen
我有一个数据同步场景是希望通过修改配置来实时动态修改数据同步的目标,例如使用flink cdc将mysql中的变更数据实时同步进kafka,如果后来业务又要求同一份数据再同步进mongodb的话,我是否可以通过修改同步配置来达到不停止原来作业来动态修改数据同步的目标(由一个变多个)?又或者是flink cdc整库同步mysql变更数据到kafka一个topic,后来业务又要求按表划分topic,这种能否同样通过修改配置来实现呢?

Re:Re: flink的消费速率是否可以调整

2022-09-26 文章 casel.chen
kafka consumer config里面有一些配置参数可以达到限速功能,例如 max.partition.fetch.bytes fetch.max.bytes max.poll.records 详情可以参考 https://kafka.apache.org/24/documentation.html#consumerconfigs 在 2022-09-26 23:27:30,"yidan zhao" 写道: >应该不行吧,kafka client本身就没有限速的功能。 > >Jason_H 于2022年9月26日周一 10:17写道: >> >>

Re:Re: flink cdc + kafka场景下增加kafka分区数问题

2022-09-26 文章 casel.chen
是的,消息key是由 `库名+表名+主键值` 组成的 在 2022-09-26 23:29:18,"yidan zhao" 写道: >之前是如何实现的,通过 kafka 的record key? > >casel.chen 于2022年9月26日周一 23:21写道: >> >> flink cdc >> 消费mysql写到kafka场景下一开始数据量不大给的分区数可能只有3,后面业务数据量上来了需要添加分区数,例如12。那么问题来了,如何确保同一条记录的数据变更历史发到同一个kafka分区以确保下游消费的顺序性?重启作业好像也不能解决这个问题吧?

flink cdc能否同步DDL语句?

2022-09-26 文章 casel.chen
flink cdc能否同步DDL语句以实现schema同步? 例如create table , create index, truncate table等

flink cdc同只步表的schema到下游kafka topic吗?

2022-09-26 文章 casel.chen
flink cdc同只步表的schema到下游kafka topic吗?类似于confluent kafka schema registry,在下游kafka新建一个_schema的topic,key是表名,value是avro格式的schema。如果可以的话要如何实现?

flink cdc + kafka场景下增加kafka分区数问题

2022-09-26 文章 casel.chen
flink cdc 消费mysql写到kafka场景下一开始数据量不大给的分区数可能只有3,后面业务数据量上来了需要添加分区数,例如12。那么问题来了,如何确保同一条记录的数据变更历史发到同一个kafka分区以确保下游消费的顺序性?重启作业好像也不能解决这个问题吧?

咨询多条flink cdc作业消费同一个库下不同表优化方案

2022-09-25 文章 casel.chen
目前业内针对多条flink cdc作业消费同一个库下不同表为了防止对数据库方产生很大查询压力,一般都是发到kafka,但这样的话下游作业只能获取到实时增量数据进行处理,如果下游作业需要获取全量数据处理的话,还得再回过头来使用cdc connector,但这样会产生上述副作用。我在想作业是否能够在获取到全量数据之后做一个checkpoint,接下来就可以改使用kafka connector? 续接的点是binlog offset,即cdc connector消费到的binlog offset要续接上kafka connector某个消息带的binlog

flink cdc作业是否支持将湖表作为源表source?

2022-09-25 文章 casel.chen
多条flink cdc作业场景直接接mysql会对数据库造成很大压力,一种办法是flink cdc下游接kafka,但这种只适用于多个下游作业只需要消费增量数据情况,如果多个下游作业需要消费存量+增量的话是不是可以考虑使用hudi/iceberg这种湖表替代kafka,像普通mysql一样flink cdc在全量快照阶段先查询湖表已有数据,再在增量快照阶段依赖湖表支持streaming query能力获取到实时全量数据?

Re:Re: flink实时双流驱动join问题

2022-09-22 文章 casel.chen
r流可以比较短些?) >3. order流和user流的数据规模/state size规模大概可以到什么级别? > >casel.chen 于2022年9月17日周六 10:59写道: > >> 请教一个flink实现实时双流驱动join问题: >> >> >> order cdc流字段:order_id, order_status, order_time, user_id (order_id是主键) >> user cdc流字段:user_id, user_name, user_phone, user_a

flink hybrid source问题

2022-09-19 文章 casel.chen
我有一个flink实时计算场景是需要先从MaxCompute读取一张表的存量数据,再从相应的kafka topic读取增量数据,一并进行计算处理。 看了一下需要用到hybrid source,目前最新flink社区版提供了Kafka/Hive/File Source,其他数据源的source是需要自己开发吗?社区有没有一个贡献source的地方? 有没有介绍如何自定义基于新版source架构的source文章或博客呢?谢谢!

flink实时双流驱动join问题

2022-09-16 文章 casel.chen
请教一个flink实现实时双流驱动join问题: order cdc流字段:order_id, order_status, order_time, user_id (order_id是主键) user cdc流字段:user_id, user_name, user_phone, user_address(user_id是主键) 关联结果流字段:order_id, order_status, order_time, user_name, user_phone, user_address(order_id是主键)

如何监控flink sql作业端到端延迟?

2022-09-16 文章 casel.chen
线上运行了多个flink sql作业,现在想监控端到端延迟。我配置了 metrics.latency.interval=3 metrics.latency.granularity=operator metrics.latency.history-size=128 参数,延迟指标已经发到了prometheus,看到该指标有50、75、95、98,99,999分位线,另外还有operator_id和operator_id_subtask_index,细到了算子子task级别。 1. 想知道怎样根据这些暴露指标统计出该flink

flink作业生成保存点失败

2022-08-29 文章 casel.chen
有一个线上flink作业在人为主动创建保存点时失败,作业有两个算子:从kafka读取数据和写到mongodb,都是48个并行度,出错后查看到写mongodb算子一共48个task,完成了45个,还有3个tasks超时(超时时长设为3分钟),正常情况下完成一次checkpoint要4秒,状态大小只有23.7kb。出错后,查看作业日志如下。在创建保存点失败后作业周期性的检查点生成也都失败了(每个算子各有3个tasks超时)。使用的是FileStateBackend,DFS用的是阿里云oss。请问出错会是因为什么原因造成的? +5 [2022-08-29 15:38:32] content:

Re:Re:Flink Native Kubernetes Resources Requests and Limits

2022-08-27 文章 casel.chen
g/flink/flink-docs-release-1.15/docs/deployment/config/#kubernetes-taskmanager-cpu-limit-factor > > >On 08/5/2022 09:24,casel.chen wrote: >我通过flink native kubernetes部署flink >1.13.2作业到k8s上发现资源实际使用量远小于请求量,特别是CPU,启动的时候CPU消耗多一些,运行一段时间后CPU消耗显著降低,如果设置CPU值较小的话又会造成作业启动慢的问题,查了一下当前实现中将资源requests

k8s环境下application模式flink作业HA的原理是什么?

2022-08-27 文章 casel.chen
k8s环境下开启作业HA后,如果JM挂了会重新拉起一个新的JM,想知道这个原理是什么?重启的作业会从上一个checkpoint位置重新消费吗?

Re:Re:flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?

2022-08-22 文章 casel.chen
quot;,"recordVersion":"123","sign":"123","synModifyTime":null,"merName":null,"bankName":null,"bankRespDesc":null,"bagentId":null,"bankId":null,"cashRespDesc":null,"reqDate":null,&qu

flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?

2022-08-20 文章 casel.chen
flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka? flink cdc获取的是debezium格式记录(用的是 JsonDebeziumDeserializationSchema),要如何转换成canal json格式输出呢?有没有例子或关键代码展示?谢谢!

flink sql支持监听单个文件内容变化吗?

2022-08-18 文章 casel.chen
flink sql支持监听单个文件内容变化吗?文件中每一行是一条记录,对外输出的模式可以全量或者变量。

Re:Re: flink on k8s作业失败后如何自动释放资源?

2022-08-15 文章 casel.chen
native模式,发现作业失败后会自动重试几次,最后部署和pod消失 在 2022-08-14 16:55:48,"yu'an huang" 写道: >你的部署模式是native还是standalone,正常作业失败是会释放资源的,可以提供更多信息吗? > > > >> On 14 Aug 2022, at 9:55 AM, casel.chen wrote: >> >> flink on k8s作业失败后现在默认是会一直占用资源,请问要如何配置使其自动释放资源? >

flink on k8s作业支持弹性扩缩容吗?

2022-08-13 文章 casel.chen
flink on k8s作业能否在给定资源范围内自动根据上游流量大小实现弹性扩缩容?例如增加并发度和TaskManager数量等

flink on k8s作业失败后如何自动释放资源?

2022-08-13 文章 casel.chen
flink on k8s作业失败后现在默认是会一直占用资源,请问要如何配置使其自动释放资源?

Flink Native Kubernetes Resources Requests and Limits

2022-08-04 文章 casel.chen
我通过flink native kubernetes部署flink 1.13.2作业到k8s上发现资源实际使用量远小于请求量,特别是CPU,启动的时候CPU消耗多一些,运行一段时间后CPU消耗显著降低,如果设置CPU值较小的话又会造成作业启动慢的问题,查了一下当前实现中将资源requests恒等于limits,请问该如何分别设置cpu和内存的requests和limits以提高资源使用效率呢?谢谢!

flink作业延迟监控

2022-07-18 文章 casel.chen
想实现flink sql作业延迟监控,例如flink sql作业将kafka数据写入mysql,记kafka记录中的事件时间为T0,发到kafka时间是T1,写入mysql的时间为T2,现要统计如下时间差(延迟) 1. T2 - T1 :flink sql作业延迟 2. T2 - T0 :端到端延迟,包括flink sql作业延迟和数据写入kafka延迟 请问: 1) 要如何暴露这2个时间差作为metrics? 2) 中间算子的处理时长能暴露吗?

如何实现flink作业失败告警功能

2022-07-18 文章 casel.chen
想实现flink作业一旦失败就立马告警功能,请问要如何实现?是否有Listener可以进行注册?

Flink消费kafka实时同步到MongoDB出现丢数据

2022-06-25 文章 casel.chen
mysql cdc -> kafka -> mongodb 写了一个flink 1.13.2作业从kafka消费mysql整库变更topic并实时同步写入mongodb,也开启了checkpoint,但实测下来发现从savepoint恢复和从groupOffsets恢复会造成数据丢失,请问这应该怎么排查?代码仓库地址:https://github.com/ChenShuai1981/mysql2mongodb.git 我的MongodbSink有实现CheckpointedFunction,并在snapshotState方法中会等待所有子线程完成写mongodb。

flink kubernetes application模式下作业镜像问题

2022-05-05 文章 casel.chen
使用flink kubernetes application模式运行flink作业需要将作业打包进镜像,这对于有大量用户个性化作业场景使用不是很方便,需要维护很多作业镜像版本。有没有办法在执行时引用到镜像外部的作业jar包或python文件,例如HDFS或者阿里云OSS。或者有其他workaround办法,之前听过init-container,但不知道具体要怎么使用,还请赐教!谢谢!

Flink MySQL CDC 注册 schema registry 问题

2022-04-22 文章 casel.chen
Hi, 我想使用 Flink MySQL CDC Connector 以 DataStream 方式消费 MySQL Binlog 输出变更数据到下游kafka topic (1),同时监听database schema change事件,将最新的schema数据输出到下游另一个kafka topic (2),又或者直接注册schema到 confluent / apicurio schema registry,查了一下flink cdc官方文档[1],并没有这方面的信息。请问应该怎么实现呢?有相关文档或例子么?谢谢! [1]

flink cdc 时间格式和时区问题

2022-04-21 文章 casel.chen
我在使用flink cdc 2.2.0获取mysql数据变更, mysqlSource设置了 .serverTimeZone("Asia/Shanghai") 发现mysql timestamp 类型的数据在mysql workbench里显示的是 "2021-06-24 16:26:47",通过JsonDebeziumDeserializationSchema解析后得到的json string串是

Re:Flink Kubernetes Operator

2022-04-14 文章 casel.chen
The deployment 'cert-manager-webhook' shows Failed to pull image "quay.io/jetstack/cert-manager-webhook:v1.7.1": rpc error: code = Unknown desc = Error response from daemon: Get "https://quay.io/v2/": net/http: TLS handshake timeout 在 2022-04-14 15:40:51,&qu

Flink Kubernetes Operator

2022-04-14 文章 casel.chen
按照其官方文档[1]尝试在mac本地的minikube上运行Flink Kubernetes Operator,结果抛下面的连接错误: $ helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --set image.repository=apache/flink-kubernetes-operator WARNING: Kubernetes configuration file is group-readable. This is insecure.

flink jdbc connector不支持source

2022-04-09 文章 casel.chen
现有一个场景是需要用flink一次性批量将某个mysql库下指定表(不同schema)同步到hudi表里面,查了一下官网flink jdbc connector [1] 文档说明只支持sink,不支持source。请问社区有支持计划吗?如果没有的话,自己要如何开发,可以给个例子吗?谢谢! [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/jdbc/

Re:flink on k8s场景,大家一般如何解决访问hdfs的问题呢。

2022-03-29 文章 casel.chen
我们是直接使用云存储,像阿里云的oss,没有再搭建hadoop集群。如果flink on k8s的确需要访问hadoop的话,是需要打包hadoop发行包在镜像里面的,配置好core-site.xml, hdfs-site.xml等 在 2022-03-30 12:01:54,"yidan zhao" 写道: >如题,是需要打包hadoop client到镜像中吗。

Re:Re: 维度表变化,关联结果全部更新在Flink SQL该如何实现

2022-03-21 文章 casel.chen
用cdc join也需要将事实表缓存下来才能实现吧,这就是普通的regular join,优点是双流驱动,缺点是需要缓存两边的数据,状态会变得很大,建议使用带ssd的rocksdb增量状态后端。 业务上如果可以接受超过一定时间范围不用关联的话,还可以设置state ttl 进一步使状态大小可控。 在 2022-03-21 17:00:31,"zns" <865094...@qq.com.INVALID> 写道: >Cdc join > >> 2022年3月21日 14:01,JianWen Huang 写道: >> >>

org.apache.flink.runtime.rpc.exceptions.FencingTokenException

2022-02-16 文章 casel.chen
Hello, 我有一个Flink 1.13.2 on native kubernetes application作业遇到如下异常,会是什么原因造成的? Starting kubernetes-application as a console application on host dc-ads-ptfz-nspos-sib-trans-sum-6d9dbf587b-tgbmx. ERROR StatusLogger Reconfiguration failed: No configuration found for '135fbaa4' at 'null' in 'null'

Re:Re:flink sql jdbc sink事务提交问题

2022-02-16 文章 casel.chen
如果mysql配置不是auto commit,那么事务是在哪一步提交呢? 在 2022-02-16 10:24:39,"Michael Ran" 写道: >jdbc 连接 mysql 的driver 记得默认就是AutoCommit。phoenix不太清楚 >在 2022-02-15 13:25:07,"casel.chen" 写道: >>最近在扩展flink sql jdbc >>connector以支持phoenix数据库,测试debug的时候发现数据能够通过Phoen

flink sql jdbc sink事务提交问题

2022-02-14 文章 casel.chen
最近在扩展flink sql jdbc connector以支持phoenix数据库,测试debug的时候发现数据能够通过PhoenixStatement.executeBatch()写入,但因为没有提交事务,所以其他人看不到。

[statefun] How to make the input/output topics live in different kafka clusters for flink stateful functions

2022-02-12 文章 casel.chen
Hi, I am newbe of flinkd stateful functions. And just want to ask a question: How to make the input/output topics live in different kafka clusters? Thanks!

flink mysql cdc注册confluent schema registry

2022-01-29 文章 casel.chen
我想利用flink mysql cdc输出变更数据到kafka,同时将table schema注册到confluent schema registry,以模拟debezium kafka connect效果[1]。还请指教要如何下手呢?谢谢! [1] https://blog.csdn.net/OldDirverHelpMe/article/details/107881170

flink算子级别资源使用设置

2022-01-15 文章 casel.chen
flink是否支持算子级别资源使用设置? 如果是flink sql 能否根据生成的Graph配置细粒度资源配置?

flink window支持回撤流吗?

2022-01-15 文章 casel.chen
Tumbling / Hopping / Session / Cumulate 这些 window 支持数据来源是回撤流吗?

cumulate window可以在retract流上使用吗?

2022-01-10 文章 casel.chen
cumulate window只能在append流上使用吗?可以在retract流或upsert流上使用吗?

flink sql动态累计窗口实现问题

2022-01-10 文章 casel.chen
听了FFA2021快手Flink SQL分享有讲到动态累计窗口实现,想问一下Flink开源社区是否有相应实现?或者有相应的JIRA?我们也有这样的使用场景,如果暂时没有的话要如何自己实现?特别是自定义sql语法这块,有没有一些相关的教程?谢谢!

cumulate窗口不支持偏移offset参数吗?

2022-01-10 文章 casel.chen
实时统计需求是每隔一小时计算周一到周日每周累计的交易量和总金额,拟使用flink sql cumulate window计算,假定作业启动时间是周四,如果不加offset偏移量参数的话,统计的是这周四到下周四的交易量和总金额,但我查了一下flink官方文档对cumulate window[1]介绍只有两个参数,一个是step,另一个是size,并没有看到offset,请问有没有办法实现?谢谢!

FLIP-188 Built-in Dynamic Table Storage 跟当下流行的数据湖技术是什么关系?

2022-01-07 文章 casel.chen
问一下FLIP-188 Built-in Dynamic Table Storage 跟当下流行的数据湖技术是什么关系?二者功能是否重复?还是侧重点不同? https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage

Re:Re: Re: flink sql回撤流sink优化问题

2022-01-06 文章 casel.chen
mini-batch优化针对sink算子也有效吗?我是直接aggregate without window然后将聚合结果输出到sink算子。 在 2022-01-06 20:43:00,"Benchao Li" 写道: >这个问题可以用mini-batch[1]来解决呀 > >[1] >https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation > >casel.

flink作业延迟统计如何实现?

2021-12-28 文章 casel.chen
希望统计flink sql作业延迟,包括以下三个指标: 1. kafka消息延迟 (消息进到kafka时间 - 消息事件本身发生时间) 2. flink作业本身延迟 (消息处理完时间 - 消息开始处理时间) 3. 端到端延迟 (消息处理完时间 - 消息事件本身发生时间) 目前Flink 1.13.2是不是只能通过LatencyMarker获取到flink作业本身延迟? 开启的话会有性能影响吗? 据说Flink 1.14 新的source/sink api能够暴露一些metrics,但不清楚要具体怎么使用?谢谢!

Flink on Native K8S作业节点调度问题

2021-12-27 文章 casel.chen
k8s集群有2个节点配置了SSD盘,其他节点是普通硬盘,希望实现配置了rocksdb状态后端类型的作业部署到这2个ssd节点,同时希望其他非rocksdb状态后端作业不会被调度到这2个ssd节点 请问:flink 1.13.2有办法实现吗?如果能实现的话,具体应该怎么操作呢?最好给个具体例子,谢谢!

Re:Re: flink sql回撤流sink优化问题

2021-12-26 文章 casel.chen
大大减少输出到kafka的消息的数量 > >casel.chen 于2021年12月23日周四 08:15写道: > >> flink sql中aggregate without >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? >> 可以再over window开窗用last_value函数吗?over window支持

Re:Re: flink sql回撤流sink优化问题

2021-12-26 文章 casel.chen
-flush.max-rows >参数 > >[1] : >https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/ > > >Zhiwen Sun > > > >On Thu, Dec 23, 2021 at 8:15 AM casel.chen wrote: > >> flink sql中aggregate without >> window产生的统计回撤流sink输出如果不做优化的

Re:Flink SQL Calcite 解析出错

2021-12-24 文章 casel.chen
eventInfo_eventTime 我猜测是 BIGINT 类型的吧? order by | range 需要用到 timestamp 类型,需要用计算列转换一下 At 2021-12-24 16:38:00, "Pinjie Huang" wrote: >我的原SQL: >CREATE TABLE consumer_session_created >( >consumer ROW (consumerUuid STRING), >clientIp STRING, >deviceId STRING, >eventInfo ROW <

Flink CDC 2.0 整库同步如何实现?

2021-12-24 文章 casel.chen
看文章介绍说Flink CDC 2.0 支持整库同步,见 https://www.jianshu.com/p/b81859d67fec 整库同步:用户要同步整个数据库只需一行 SQL 语法即可完成,而不用每张表定义一个 DDL 和 query。 想知道Flink CDC 2.0 整库同步如何实现?有没有例子?谢谢!

Re:Re:flink on native k8s模式下CPU使用率不高问题

2021-12-23 文章 casel.chen
ource传入参数来替换request > > > > > > > > > > > > > > > > > >在 2021-12-18 09:15:06,"casel.chen" 写道: >>所用flink版本是1.12.5,部署作业到native k8s设置的不管是 kubernetes.taskmanager.cpu 还是 >>kubernetes.jobmanager.cpu 最终在k8s yaml文件中显示的cpu >>request和limit都是一样的。

请教flink sql作业链路延迟监控如何实现

2021-12-22 文章 casel.chen
想问一下flink sql作业链路延迟监控如何实现? 我们的flink sql作业基本上都是上游接kafka,下游sink到es/hbase/kafka/mongodb/redis/clickhouse/doris这些存储 想监控如下三种延迟,目前有什么办法实现吗?会有相应的metrics暴露出来吗?目前我们在用的flink版本是1.13.2 1. 端到端的延迟 2. kafka本身的延迟 3. flink处理的延迟

flink sql回撤流sink优化问题

2021-12-22 文章 casel.chen
flink sql中aggregate without window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? 例如有下面binlog cdc购买数据(订单购买金额会更新): orderid. categorydt

Re:Re: 回撤流优化

2021-12-20 文章 casel.chen
mini-batch只是攒批后再执行,执行数据量并没有减少。而在我这个场景下是需要以攒批内再根据key聚合取最新的结果,执行数据量会大大减少。mini-batch应该还做不到吧? 在 2021-12-16 17:15:45,"Jingsong Li" 写道: >理论上mini-batch就可以优化回撤流。 > >目前是join没有支持mini-batch。 > >On Thu, Dec 16, 2021 at 5:12 PM casel.chen wrote: >>

Re:Re:flink on native k8s模式下CPU使用率不高问题

2021-12-20 文章 casel.chen
> > > > > > > > > > >在 2021-12-18 09:15:06,"casel.chen" 写道: >>所用flink版本是1.12.5,部署作业到native k8s设置的不管是 kubernetes.taskmanager.cpu 还是 >>kubernetes.jobmanager.cpu 最终在k8s yaml文件中显示的cpu >>request和limit都是一样的。这会导致作业的CPU使用率很低,因为我们发现作业刚启动的时候所需要的CP

flink on native k8s模式下CPU使用率不高问题

2021-12-17 文章 casel.chen
所用flink版本是1.12.5,部署作业到native k8s设置的不管是 kubernetes.taskmanager.cpu 还是 kubernetes.jobmanager.cpu 最终在k8s yaml文件中显示的cpu request和limit都是一样的。这会导致作业的CPU使用率很低,因为我们发现作业刚启动的时候所需要的CPU资源要远远高于作业实际运行起来的CPU资源,二者可能相差近5倍左右。如果设置的cpu较低的话,作业启动需要花费很长时间。 如何才能够提高作业CPU使用率呢?可以直接修改k8s

紧急bugfix的那些flink jar包在maven中心仓库上找不到

2021-12-16 文章 casel.chen
例如 flink 1.13.5,这些jar包有上传到maven中心仓库吗?我没有看到,编译的时候出错了。

双流窗口内join用flink sql实现的语法是什么?

2021-12-16 文章 casel.chen
每隔5分钟join来自两条流的数据,用flink sql实现的写法是什么? 需要先join再窗口计算还是可以直接窗口内join? flink版本是1.13

<    1   2   3   4   >