flink作业版本管理实现方案探讨

2021-01-19 文章 casel.chen
为支持作业上线评审和遇到问题进行版本回滚,实时计算平台中的flink作业版本管理采用哪种方案比较好? 一种声音是用数据库来存,另一种声音是使用git来管理:一个作业对应一个repo,里面有作业sql或jar文件,作业参数和配置文件等。 用数据库存的话作业文件比较割裂,像文本文件可以直接存表,但像jar包的话得存分布式文件系统,同时在数据表中记录文件id。我个人更倾向于用git,不知道这里会不会有什么坑?还请做过的朋友给个建议,谢谢!

flink作业版本管理实现方案

2021-01-19 文章 casel.chen
为支持作业上线评审和遇到问题进行版本回滚,实时计算平台中的flink作业版本管理采用哪种方案比较好? 一种声音是用数据库来存,另一种声音是使用git来管理:一个作业对应一个repo,里面有作业sql或jar文件,作业参数和配置文件等。

flink yarn application提交作业问题

2021-01-19 文章 casel.chen
使用了如下命令来提交flink作业到yarn上运行,结果出错。如果job jar路径改成本地的就没有问题。我已经将 flink-oss-fs-hadoop-1.12.0.jar 放到flink lib目录下面,并且在flink.conf配置文件中设置好了oss参数。试问,这种作业jar在远端的分布式文件系统flink难道不支持吗? ./bin/flink run-application -t yarn-application \ -Dyarn.provided.lib.dirs="oss://odps-prd/rtdp/flinkLib" \

Re:flink yarn application提交作业问题

2021-01-19 文章 casel.chen
./bin/flink run-application -t yarn-application \ -Dyarn.provided.lib.dirs="hdfs://localhost:9000/flinkLib" \ hdfs://localhost:9000/flinkJobs/TopSpeedWindowing.jar 这种命令执行方式是可以执行的。 在 2021-01-20 10:21:32,"casel.chen" 写道: >使用了如下命令来提交flink作业到yarn上运行,结果出

flink yarn application 提交任务出错

2021-01-19 文章 casel.chen
今天尝试使用yarn application模式(带yarn.provided.lib.dirs参数),将$FLINK_LIB目录下的jar包上传到了hdfs,结果报了如下的错,是少了哪个jar包或配置文件吗? org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn Application Cluster at

flink sql cdc到kafka,如何提高sink并行度?

2021-06-14 文章 casel.chen
flink sql cdc发到kafka,显示下游写kafka并行度只有1,有没有办法提高并行度呢? 显示job-parallelism, table.exec.resource.default-parallelism, parallelism.default 都是 24,但execution graph显示parallelism还是1,我设置了pipeline.operator-chaining=false

Re:Re:使用mysql-cdc 的scan.startup.mode = specific-offset的读取模式,运行一段时间后,报错

2021-06-07 文章 casel.chen
这个问题很严重啊,生产线上可不敢这么用,丢失部分数据是不能接受的。社区什么时候能支持 GTID 呢?官方网档上有写么? 在 2021-06-07 18:40:50,"董建" <62...@163.com> 写道: > > > >我也遇到了这种情况,可能是你们的db做了主从切换。 >因为binlog每台服务器的pos都不一样。 >mysql5.6以后支持了GTID的同步方式,这个是全局唯一的。但是目前mysql-cdc貌似还不支持。 >我目前的解决方案是出错后从最后的位置开始消费,可能会丢失一部分数据。 > > > > > > > > > > > > > >

Re:回复: Flink 维表延迟join

2021-06-07 文章 casel.chen
双流interval join是否可行呢? 在 2021-06-07 16:35:10,"Jason Lee" 写道: > > >我么生产环境同样遇到这种问题,因为上有流数据到了,但是维表数据未更新导致丢失部分数据,请问大家现在有好的解决方案去解决Flink SQL >维表延迟Join的问题了吗? > > >有解决方案的小伙伴能分享下嘛? >| | >JasonLee >| >| >jasonlee1...@163.com >| >签名由网易邮箱大师定制 > > >在2021年02月25日 14:40,Suhan 写道:

如何获取flink sql的血缘关系?

2021-06-07 文章 casel.chen
如何获取flink sql的血缘关系?如:表A -> 表B。有代码示例吗?谢谢!

flink sql cdc数据同步至mysql

2021-06-08 文章 casel.chen
flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决? flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?

flink sql维表延迟join如何实现?

2021-06-09 文章 casel.chen
延迟join主要是为了解决维表数据后于事实表数据到达问题。java代码可以实现,那flink sql这块能否通过sql hint解决呢?有没有示例?

flink sql平台多版本支持问题

2021-06-12 文章 casel.chen
需求背景: 因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink SQL作业采用的是1.13开发的。 而让平台支持不同Flink版本,我能想到有三种实现方案: 1. 平台直接调用 flink run 或 flink run-application 提交作业 优点:实现简单,每个flink版本都会带这个shell脚本

flink on 原生kubernetes支持批处理吗?

2021-06-12 文章 casel.chen
我们知道flink on 原生kubernetes当前是用k8s deployment运行一个流作业的,请问会用k8s job运行一个批作业吗?

flink sql cdc支持额外字段问题

2021-06-10 文章 casel.chen
flink sql cdc写入kafka,期望kafka消息带上数据库database,表table,变更时间和变更标记+I/-U/+U/-D这几个特殊字段,目前来看是做不到的,对吗?

Re: flink sql平台多版本支持问题

2021-06-12 文章 casel.chen
好的,我先尝试使用一下,谢谢! 在 2021-06-13 10:43:12,"Jeff Zhang" 写道: >如果不是native k8s的话,现在已经支持了,用remote模式就可以, >https://www.yuque.com/jeffzhangjianfeng/gldg8w/engh3w >native k8s的话,社区正在做,这是PR: https://github.com/apache/zeppelin/pull/4116 > > >casel.chen 于2021年6月

Re:flink sql cdc数据同步至mysql

2021-06-12 文章 casel.chen
。 > > >在 2021-06-11 18:57:36,"casel.chen" 写道: >>我的场景就是简单的数据同步,没有join也没有group by,从一个mysql库同步到另一个mysql库。 >>上游写入数据速度很快,如果用flink sql同步的话默认只有一个并行度写,速度会跟不上,这种情况要怎么处理? >>用的是flink 1.12.1 其jdbc connector还不支持sink.parallism参数 >> >> >> >> >> >>

Re:Re: flink sql cdc数据同步至mysql

2021-06-12 文章 casel.chen
请问 flink sql cdc 场景下如何增大下游sink端并行度? 我试了修改default.parallism=2参数,并且将operator chain参数设置成false,并没有效果。 而后,我将作业分成两步:首先 源mysql cdc sink到 upsert kafka,再从 upsert kafka sink到 目标mysql。是想通过kafka partition增大sink并行度 初步测试效果是可以的,kafka建了3个partitions,每个partitions都按主键hash分配到数据,下游并行度跟partitions个数对齐。 以下是作业内容: --

Re:Re: flink sql平台多版本支持问题

2021-06-12 文章 casel.chen
有版本。钉钉群:32803524 > >casel.chen 于2021年6月12日周六 下午5:56写道: > >> 需求背景: >> 因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink >> SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink >> SQL作业采用的是1.13开发的。 >> >> >> 而让平台支持不同Flink版本,我能想

Flink state evolution with avro

2021-06-10 文章 casel.chen
Is there any live code example about flink state evolution with avro? Thanks!

Re:Re: flink sql cdc数据同步至mysql

2021-06-10 文章 casel.chen
针对现在flink sql cdc下游并行度无法修改问题,是否可以分两步实现?谢谢! 1. flink sql cdc发到下游kafka,通过 upsert kafka connector,以debezium或canal格式,kafka topic开多个分区 2. 再从kafka消费,通过flink sql同步到最终mysql库 在 2021-06-08 19:49:40,"Leonard Xu" 写道: >试着回答下这两个问题。 > >> flink 1.12的jdbc connector不支持 sink.parallism

应用初始化HiveCatalog出错 "URI is not hierarchical"

2021-06-10 文章 casel.chen
我在spring boot应用中使用HiveCatalog展示库和表信息,通过传入hive参数初始化HiveCatalog时抛如下错误 hiveCatalog = new HiveCatalog(hiveConfig.getCatalogName(), hiveConfig.getDefaultDatabase(), hiveConfig.getHiveConfDir()); hiveCatalog.open(); 在spring boot应用所在机器上我只在/opt/hive/conf目录下准备了hive-site.xml,还缺什么配置么? 2021-06-11

flink hudi写oss文件失败报No FileSystem for scheme "oss"

2021-06-18 文章 casel.chen
hadoop 2.9.2, flink 1.12.2, hudi 0.9.0-SNAPSHOT core-site.xml里面配置了oss相关信息,本地启flink cluster,执行flink sql client创建表,写数据和查询都没问题。 改成在项目中flink sql作业,打包成fat jar以local方式运行,项目中引用了 flink-oss-fs-hadoop,但程序报了如下错误 Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "oss"

如何通过反射获取connector是否支持lookup?

2021-06-17 文章 casel.chen
获取connector是否支持source和sink只要看 XXXDynamicTableFactory 是否实现 DynamicTableSourceFactory和DynamicTableSinkFactory接口,但在source情况下如何进一步判断它是否支持lookup呢? public DynamicTableSource createDynamicTableSource(Context context)

flink sql cdc如何获取元数据

2021-06-22 文章 casel.chen
flink sql cdc如何获取元数据?像数据库名,表名,操作类型,binlog产生时间等。 create table xxx_tbl ( k_op varchar, -- 操作类型 k_database varchar, -- 数据库名 k_table varchar, -- 表名 k_ts. BIGINT, -- binlog产生时间 idBIGINT, name. varchar ) with ( 'connector' = 'mysql-cdc', . 'meta.fields-prefix' = 'k_'

flink 1.12如何实现window topN功能

2021-06-22 文章 casel.chen
官网文档上显示flink 1.13支持window topN,但flink 1.12没看到相关文档,请问有什么work around方法吗?

Re:回复:flink 1.12如何实现window topN功能

2021-06-22 文章 casel.chen
请不要截图哦 在 2021-06-23 09:47:46,"杨光跃" 写道: 1.12也支持的 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月23日 09:45,casel.chen 写道: 官网文档上显示flink 1.13支持window topN,但flink 1.12没看到相关文档,请问有什么work around方法吗?

Re:回复:flink 1.12如何实现window topN功能

2021-06-22 文章 casel.chen
1.12 Documentation: Queries >| | >杨光跃 >| >| >yangguangyuem...@163.com >| >签名由网易邮箱大师定制 >在2021年6月23日 10:09,casel.chen 写道: >请不要截图哦 > > > > > > > > > > > > > > > > >在 2021-06-23 09:47:46,"杨光跃" 写道: > >1.12也支持的

flink多表延迟关联问题

2021-06-22 文章 casel.chen
上游有多个流需要经过flink作近实时关联,数据流有先到和后到问题,为了尽可能关联上,有什么办法可以实现延迟关联吗?每个流开个1分钟窗口再关联相同key的元素可以吗?用flink sql如何实现?谢谢!

Re:回复:flink 1.12如何实现window topN功能

2021-06-22 文章 casel.chen
wStart, FROM source_event > group by TUMBLE(ts, INTERVAL '10' SECOND), 主键 > > >2. 根据上一步的结果取top5 >select * from (select * ,ROW_NUMBER() OVER (PARTITION BY wStart ORDER BY >处理时间字段 ) as rownum from 上一步的虚拟表) where rownum <= 5 > >| | >杨光跃 >| >| >yangguangyuem...@163.

Re:Re: flink多表延迟关联问题

2021-06-22 文章 casel.chen
达一定时间再发出来 >datastream可以用窗口+side output来实现,但是sql没有side output实现,所以存在丢数据的情况 > >> 2021年6月23日 上午8:27,casel.chen 写道: >> >> 上游有多个流需要经过flink作近实时关联,数据流有先到和后到问题,为了尽可能关联上,有什么办法可以实现延迟关联吗?每个流开个1分钟窗口再关联相同key的元素可以吗?用flink >> sql如何实现?谢谢! >

Re:Re: flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2021-06-16 文章 casel.chen
Flink CDC什么时候能够支持修改并行度,进行细粒度的资源控制?目前我也遇到flink sql cdc写mysql遇到数据同步跟不上数据写入速度问题,何时能支持像mysql并行复制这种机制呢? 在 2021-06-16 17:27:14,"Leonard Xu" 写道: >看起来和 Flink-CDC 关系不大,看异常栈是 ES 侧抛出的异常 version_conflict_engine_exception, >可以查下这个异常,看下是不是有写(其他作业/业务 也在写同步表)冲突。 > >祝好, >Leonard > >> 在

Re:Re:flink sql cdc数据同步至mysql

2021-06-11 文章 casel.chen
引用 Leonard Xu大佬之前的回答: > flink 1.13的jdbc connector新增 sink.parallism > 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么? 这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致, 否则可能导致数据乱序。 这个社区也在从 plan

Re:Re: 应用初始化HiveCatalog出错 "URI is not hierarchical"

2021-06-11 文章 casel.chen
hive-stie.xml不在classpath下面,而是通过配置文件加载的: hiveConfig.getHiveConfDir() 例如 /opt/hive/conf 这个目录下有hive-site.xml 在 2021-06-11 13:59:20,"Rui Li" 写道: >你好, > >看一下jar里面是不是有hive-site.xml文件呢? > >On Fri, Jun 11, 2021 at 10:37 AM casel.chen wrote: > >> 我在spri

Re:Re: flink sql维表延迟join如何实现?

2021-06-11 文章 casel.chen
有例子吗?或者相关资料连接也行 在 2021-06-11 12:40:10,"chenchencc" <1353637...@qq.com> 写道: >使用事件时间就可以延时 > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re:flink sql cdc数据同步至mysql

2021-06-11 文章 casel.chen
等于业务down掉。 >2、如果只是简单的insert into xxx select >xxx,就不用担心,runtime在遇到上下游并行度不一致时,如果有主键会按照主键hash的。 > > >在 2021-06-08 14:05:17,"casel.chen" 写道: >>flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc >>connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同

Re:Re:Re:Re:flink sql cdc数据同步至mysql

2021-06-11 文章 casel.chen
你的场景是怎样的呢? > > > > >另外,https://issues.apache.org/jira/browse/FLINK-20374 是sink到es,但es >connector并不支持指定sink.parallelism,也就是说sink端的并行度必然为上游的并行度。而jdbc >connector是可以指定sink.parallelism的,只要与上游的并行度不一致,runtime就会根据pk做hash >shuffle,确保相同pk的记录发到同一个sink task。 > > >在 2021-

Re:Re:flink sql写mysql中文乱码问题

2021-05-18 文章 casel.chen
'username' = 'mysqluser', 'password' = 'mysqluser', >'table-name' = 'jdbc_sink') >在 2021-05-18 11:55:46,"casel.chen" 写道: >>我的flink sql作业如下 >> >> >>SELECT >>product_name, >>window_start, >>window_end, >>CAST(SUM(trans_amt)ASDECIMAL(24,2)

请问在native kubernetes上如何运行Flink History Server?

2021-05-07 文章 casel.chen
请问在native kubernetes上如何运行Flink History Server? 有没有相应的文档?

flink on native kubernetes要如何修改Logging配置?

2021-05-07 文章 casel.chen
我用native kubernetes方式部署flink session cluster,想修改某个包下的日志级别,如果直接修改configmap下的log4j-console.properties再重新部署是能生效的,但是通过命令行 (./bin/kubernetes-session.sh -Dkubernetes.cluster-id=xxx) 起flink session cluster会将之前的修改冲掉,有什么办法可以保留下之前的修改吗?是否有命令行启动参数可以指定自定义的logging配置?谢谢!

Re:flink sql怎样将change log stream转换成append log stream?

2021-05-17 文章 casel.chen
没有人知道吗? 在 2021-05-13 17:20:15,"casel.chen" 写道: flink sql怎样将change log stream转换成append log stream? 通过cdc接入了change log stream,后面就不能使用窗口聚合了,像Tumble和Hop, Session窗口。只能通过state ttl + group by timestamp这种方式聚合。 问一下有没有办法将change log stream转换成append log stream,从而可以使用上述窗口聚合了呢?谢谢!

Re:flink sql如何修改执行计划?

2021-05-17 文章 casel.chen
没有人知道吗? 在 2021-05-13 08:19:24,"casel.chen" 写道: >flink sql如何修改执行计划?例如,修改上下游算子不同的并行度,或者将算子链人为打断等等。 >我知道如何获取flink sql执行计划,但要怎么人为干预这个执行计划呢?还请大佬解答一下,谢谢!

flink sql源表定义字段列表问题

2021-05-17 文章 casel.chen
采用flink sql定义源表时,哪些connector支持提供部分字段,哪些connector必须要提供全量字段呢? 这边常用的connector主要有kafka, jdbc, clickhouse和mysql-cdc。 cdc是不是必须要提供对应表的全量字段呢?如果上游mysql业务表新增了字段,flink sql作业会不会出错? kafka表定义是否支持部分字段?

flink sql写mysql中文乱码问题

2021-05-17 文章 casel.chen
我的flink sql作业如下 SELECT product_name, window_start, window_end, CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, CAST(COUNT(order_no)ASBIGINT) trans_cnt, -- LOCALTIMESTAMP AS insert_time, '微支付事业部'AS bus_name FROM( mysql sink表的定义如下 CREATE TABLE XXX ( ) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT

flink sql如何修改执行计划?

2021-05-12 文章 casel.chen
flink sql如何修改执行计划?例如,修改上下游算子不同的并行度,或者将算子链人为打断等等。 我知道如何获取flink sql执行计划,但要怎么人为干预这个执行计划呢?还请大佬解答一下,谢谢!

flink sql怎样将change log stream转换成append log stream?

2021-05-13 文章 casel.chen
flink sql怎样将change log stream转换成append log stream? 通过cdc接入了change log stream,后面就不能使用窗口聚合了,像Tumble和Hop, Session窗口。只能通过state ttl + group by timestamp这种方式聚合。 问一下有没有办法将change log stream转换成append log stream,从而可以使用上述窗口聚合了呢?谢谢!

flink sql运行在阿里云k8s用oss作为checkpoint存储介质出错

2021-05-21 文章 casel.chen
flink sql运行在阿里云k8s用oss作为checkpoint存储介质,在作业启动过程中出错,请问这个NoSuchKey是指什么?flink在获取checkpoint作restore吗? 2021-05-21 10:56:10,278 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: TableSourceScan(table=[[default_catalog, default_database, kafka_source]], fields=[id,

flink sql支持创建临时函数吗?

2021-05-21 文章 casel.chen
如下 CREATE TEMPORARY FUNCTION get_seniority(tenure INT64) AS ( CASE WHEN tenure < 1 THEN "analyst" WHEN tenure BETWEEN 1 and 3 THEN "associate" WHEN tenure BETWEEN 3 and 5 THEN "senior" WHEN tenure > 5 THEN "vp" ELSE "n/a" END ); SELECT name ,

flink sql支持Common Table Expression (CTE)吗?

2021-05-21 文章 casel.chen
flink sql支持Common Table Expression (CTE)吗?是不是可以通过 create temporary view xxx 来实现?CTE和temporary view的区别是什么? 例如 with toronto_ppl as ( SELECT DISTINCT name FROM population WHERE country = "Canada" AND city = "Toronto" ) , avg_female_salary as ( SELECT AVG(salary) as

flink sql cdc并行度问题

2021-05-24 文章 casel.chen
flink sql作业:消费mysql binlog将数据同步到 mongodb 问题: 1. mysql-cdc connector只能设置成一个并行度吗? 2. 可以增大mongodb的sink并行度吗?可以的话,要如何设置?它保证主键相同的记录会发到同一个分区sink吗?

Re:Re:Re:Re:flink sql写mysql中文乱码问题

2021-05-24 文章 casel.chen
jdbc:mysql://host:3306/datav_test?useUnicode=true=utf8 本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢! 在 2021-05-19 17:52:01,"Michael Ran" 写道: > > > >数据库的字段字符编码 > > > > > > > > > > > > > > >在 2021-05-18 18:19:31,

Re:flink sql写mysql中文乱码问题

2021-05-25 文章 casel.chen
LE’F%')]) DataStreamScan(id=[1], fields=[count, word]) DataStreamScan(id=[2], fields=[count, word]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat 在 2021-05-25 10:40:46,"casel.chen" 写道: >数据库字符编码设置如下 > >

怎么避免flink sql cdc作业重启后重新从头开始消费binlog?

2021-06-02 文章 casel.chen
我有一个如下flink sql cdc作业,设置了'scan.startup.mode' = 'latest-offset'。但在作业重启后发现它又从头开始消费binlog,导致sink下游数据库频繁报duplicate key error,有什么办法可以避免吗? CREATE TABLE `mysql_source` ( `id` STRING, `acct_id` STRING, `acct_name` STRING, `acct_type` STRING, `acct_bal` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH (

flink sql作业表定义部分字段问题

2021-06-02 文章 casel.chen
有一个flink sql mysql-cdc作业,从kafka消费canal-json格式的binlog数据,打到下游存储,遇到一个问题:上游源表字段是会动态添加的,而我的sql table定义是静态的,如果上游schema发生变化,我的作业就会失败。在flink sql中是否可以针对目标数据源只定义用到的字段?如果可以的话应该怎么实现?现在是会抛类似下面的error。意思是原本包含43列的数据表,在我的DDL中只定义了其中的4列。有哪些格式是支持定义部分字段的呢? 21/06/02 18:54:22 [Source:

如何根据flink sql解析出作业的血缘关系?

2021-05-27 文章 casel.chen
如何根据flink sql解析出作业的血缘关系?找到类似这样的血缘关系:source table A --> lookup table B --> sink table C

rocksdb状态后端最多保留checkpoints问题

2021-05-27 文章 casel.chen
作业中使用了增量式的rocksdb状态后端,请问如果再设置了最多保留checkpoints个数的话,会不会造成rocksdb state恢复失败?例如,假设完整的state恢复需要最近10个chk,但因为设置了最多保留checkpoints个数为5的话,状态是不是恢复不了了?

Re:Re: flink sql cdc并行度问题

2021-05-27 文章 casel.chen
我的作业是用flink sql消费mysql cdc binlog并实时同步到mongodb。如果只开一个并行度的话,mongodb的写入速度可能追不上mysql的写入。所以我需要在sink端开大并行度。 我不清楚用sql怎么写keyBy,是不是要group by pk呢?我原来的想法是在MongoDBSinkFunction中开一个线程池,每个线程对应下游sink的一个并行度,每个线程带一个queue,MongoDBSinkFunction根据数据PK往对应的queue发数据,每个消费者线程从自己的queue pull数据再进行批量插入。不知道这样可不可行?

flink postgres jdbc catalog是只读的吗?

2021-06-02 文章 casel.chen
flink postgres jdbc catalog是只读的吗?能写的catalog 除了Hive Catalog还有哪些?社区什么时候会有Mysql JDBC Catalog呢?

flink sql调整算子并行度的方法有哪些?

2021-06-05 文章 casel.chen
flink sql调整算子并行度的方法有哪些?通过 sql hint 可以调整吗?

回撤流的窗口统计

2021-06-05 文章 casel.chen
上游是binlog cdc消费获取的回撤流,现要使用flink sql统计分析该回撤流上每5分钟的sum值,不能使用常规tumble window是吗?只能使用group by ts配合state TTL进行? 另外,问一下flink sql的state TTL只能是全局设置吗?能够通过在sql hint上添加从而可以细粒度控制吗?

flink sql cdc作数据同步作业数太多

2021-06-06 文章 casel.chen
flink sql cdc作数据同步,因为是基于库+表级别的,表数量太多导致作业数太多。请问能否用flink sql cdc基于库级别同步?这样作业数量会少很多。

flink状态查看工具

2021-05-25 文章 casel.chen
我有一个flink sql写的数据实时同步作业,从mysql binlog cdc消费发到mongodb,仅此而已,没有lookup,也没有join。 查看checkpoint页显示状态有17MB,checkpoint耗时要2s。 想知道为什么状态会如此之大,有没有状态查看工具看看里面到底存了什么信息?

如何将canal json格式数据按操作类型过滤

2021-07-07 文章 casel.chen
使用场景:我们使用canal将mysql binlog输出到kafka,然后想通过flink消费kafka数据过滤掉 delete 操作的数据插入到文件系统,因为要做历史数据存档用。 查了下官网 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/canal/#available-metadata {"data":[{"id":"111","name":"scooter","description":"Big 2-wheel

java.lang.LinkageError: loader constraint violation

2021-07-07 文章 casel.chen
使用flink 1.12.1,作业依赖了flink-sql-connector-kafka (依赖kafka-client 2.4.1) 和一个消费阿里dts应用的connector(该connector依赖kafka-client 1.0.0),出现下面的异常后我将flink-sql-connector-kafka依赖的kafka-client也换成了1.0.0,结果运行作业还是报同样的异常。请问这是jar包冲突造成的吗?应该怎么解决? Caused by: java.lang.LinkageError: loader constraint violation: loader

local运行模式下不会生成checkpoint吗?

2021-06-28 文章 casel.chen
我在本地使用local运行模式运行flink sql,将数据从kafka写到mongodb,mongodb connector是自己开发的,实现了CheckpointedFunction接口,debug的时候发现数据进来的时候有调用invoke方法,但没有调用initialState和snapshotState方法,我有设置enableCheckpoint,同样的程序使用kubernetes部署发现是会调用snapshotState方法。我的问题是:local运行模式下不会生成checkpoint吗?

flink on native k8s要如何动态改变日志配置?

2021-07-09 文章 casel.chen
flink运行在原生k8s上,现在想要修改Root Logger Level和动态添加 Logger Name -> Logger Level,以及用户可以传入自定义的日志模板,目前有办法做到么?

flink作业日志能否保存到oss?

2021-07-11 文章 casel.chen
我们使用k8s运行flink作业,作业日志存储在容器中,一旦作业挂了容器销毁了就没法获取出问题的日志,有什么办法可以将日志保存到oss上么?通过配置启动history server吗?

flink批作业需要哪些配置?

2021-07-12 文章 casel.chen
flink运行批作业相较于流作业需要修改/添加哪些配置呢? 另外,使用方面有什么特别需要注意的地方吗?谢谢!

Re:Re: flink on native k8s要如何动态改变日志配置?

2021-07-12 文章 casel.chen
ent/resource-providers/native_kubernetes/#changing-the-log-level-dynamically > >Best, >Yang > >casel.chen 于2021年7月9日周五 下午8:29写道: > >> flink运行在原生k8s上,现在想要修改Root Logger Level和动态添加 Logger Name -> Logger >> Level,以及用户可以传入自定义的日志模板,目前有办法做到么?

flink sql cdc数据按主键keyby入库问题

2021-07-10 文章 casel.chen
场景:mysql数据实时同步到mongodb. 上游mysql binlog日志发到一个kafka topic, 不保证同一个主键的记录发到相同的partition,为了保证下游sink mongodb同一主键的所有记录按序保存,所以需要按主键keyby。然后下游再批量写入mongodb。 问题:flink sql有办法解决上述问题?如果可以的话,要怎么写? create table person_source ( id BIGINT PRIMARY KEY NOT FORCED, name STRING, age BIGINT ) with (

flink run命令是否支持读取远程文件系统中的jar文件?

2021-04-22 文章 casel.chen
flink run是否支持读取远程文件系统,例如oss://或hdfs://路径下的jar文件?看源码是需要构建PakcagedProgram,而它的构造函数中有一个File jarFile参数。不知是否能够从oss路径或hdfs路径构建出File对象。

Flink standalone on k8s HA异常

2021-02-08 文章 casel.chen
我试着答k8s上部署flink standalone集群,做HA之前集群是能够正常work的,在做HA的时候发现在configmap中添加了如下两个HA配置后JM就会抛异常,这是为什么? high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: oss:///odps-prd/rtdp/flink/recovery 2021-02-09 00:03:04,421 ERROR

flink kubernetes application频繁重启TaskManager问题

2021-04-04 文章 casel.chen
最近试用flink kubernetes application时发现TM不断申请再终止,而且设置的LoadBalancer类型的Rest服务一直没有ready,查看不到flink web ui,k8s日志如下,这是什么原因?是因为我申请的资源太小么? = 启动参数 "kubernetes.jobmanager.cpu": "0.1", "kubernetes.taskmanager.cpu": "0.1", "taskmanager.numberOfTaskSlots": "1", "jobmanager.memory.process.size":

Flink on Native K8S模式下如何配置StandBy做高可用?

2021-03-25 文章 casel.chen
Flink on K8S Standalone模式下可以通过yaml启多个JM,但是在Native K8S模式下要如果做呢?有文档资料介绍吗?谢谢!

请问有flink + hudi或iceberg + aliyun oss的示例吗?

2021-03-15 文章 casel.chen
请问有flink + hudi或iceberg + aliyun oss的示例吗?谢谢!

flink sql sink多数据源问题

2021-03-10 文章 casel.chen
请教一下flink sql多条数据sink用 statement set 语句时, 1. 如果其中一条sink条发生背压或故障,会影响其他sink流吗? 2. 在flink sql cdc 消费同一张mysql表sink进多种数据源场景下,例如 mysql -> fink cdc -> mongodb & polardb 建议是启多个作业分别etl,还是分两段 mysql -> flink cdc -> kafka -> flink -> mongodb & polardb ... 呢?关系数据库端接入同时多个cdc会不会影响性能?

flink cdc遇到数据源大事务怎么处理?

2021-03-10 文章 casel.chen
flink cdc对接上游的mysql或pg业务库时遇到业务库大批量修数或schema变更是怎么处理的? 会不会瞬间产生很多changelog records打爆flink应用?如果会的话应该要如何避免呢?谢谢!

flink sql中如何使用异步io关联维表?

2021-03-03 文章 casel.chen
flink sql中如何使用异步io关联维表?官网文档有介绍么?

flink on native kubernetes application Rest服务暴露问题

2021-04-07 文章 casel.chen
我在阿里云k8s上部署flink on native kubernetes application,默认用的服务暴露类型是 LoadBalancer,启动后会在公网暴露rest url。运维管理人员不允许这样,说是只能使用固定预先申请的几个SLB,但我在flink官网没有找到有参数设置LoadBalancerIP,这样情况要怎么实现?

GroupWindowAggregate doesn't support consuming update and delete changes

2021-04-19 文章 casel.chen
使用 flink sql 1.12.1时遇到三个问题: 1. GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, mcsp_pay_log, ... 按时间窗口聚合不支持上游是canal-json格式的cdc表的情况么?我的业务表其实是一张日志表,怎样用flink sql将retract table转成append

Re:Re: 如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming update and delete changes

2021-04-19 文章 casel.chen
flink window doesn't support retract stream 的话有什么workaround办法吗?常见的场景有 业务表cdc -> kakfa -> flink按时间窗口聚合 如果业务表是只会insert的日志表,该如何将retract table转换成普通table? GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog,

Re:如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming update and delete changes

2021-04-19 文章 casel.chen
我也遇到同样的问题 GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, mcsp_pay_log, ... 按时间窗口聚合不支持上游是canal-json格式的cdc表的情况么?我的业务表其实是一张日志表,怎样用flink sql将retract table转成append table?

flink run是否支持读取oss://或hdfs://路径下的jar文件?

2021-04-13 文章 casel.chen
flink run是否支持读取oss://或hdfs://路径下的jar文件?看源码是需要构建PakcagedProgram,而它的构造函数中有一个File jarFile参数。不知是否能够从oss路径或hdfs路径构建出File对象。 private PackagedProgram( @Nullable File jarFile, List classpaths, @Nullable String entryPointClassName, Configuration configuration, SavepointRestoreSettings

flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

2021-04-20 文章 casel.chen
目标是用flink作业实现类似canal server的功能 CREATE TABLE `binlog_table` ( `id` INT, `name` STRING, `sys_id` STRING, `sequence` INT, `filter`

Re:回复: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?

2021-04-21 文章 casel.chen
>发送时间:2021年4月21日(星期三) 下午2:16 >收件人:"user-zh" >主题:Re: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗? > > > >Hi casel. >flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。 > >https://github.com/ververica/flink-cdc-connectors/blob/master/README.md > >ca

flink k8s高可用如何使用oss作为high-availability.storageDir?

2021-02-17 文章 casel.chen
如题,在k8s环境下不想使用hdfs作为high-availability.storageDir,有没有办法直接使用oss呢?checkpoint和savepoint已经能够使用oss了。

flink 1.12.0 k8s session部署异常

2021-02-07 文章 casel.chen
在k8s上部署sesson模式的flink集群遇到jobmanager报如下错误,请问这是什么原因造成的?要如何fix? 2021-02-07 08:21:41,873 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_1 . 2021-02-07

flink on k8s日志时间戳时区问题

2021-02-18 文章 casel.chen
目前是UTC时区的,怎样才能设置成当地的东8区呢?谢谢! 2021-02-19 01:34:21,259 INFO akka.event.slf4j.Slf4jLogger [] - Slf4jLogger started 2021-02-19 01:34:22,155 INFO akka.remote.Remoting [] - Starting remoting 2021-02-19 01:34:21,259 INFO

Re:回复:flink sql cdc发到kafka消息表名信息缺失问题

2021-04-22 文章 casel.chen
我的疑问正是flink cdc集成debezium后为何会把原始信息弄丢失了?直接采用原生的debezium或者canal同步数据固然可以。但如果flink cdc直接能发出来的话不就可以节省这些组件和运维么?flink cdc设计的初衷也是如此。 在 2021-04-22 11:01:22,"飞翔" 写道: 既然这样,为何要用flink去同步信息,把信息的原始信息都丢失了。你可以直接采用原生的debezium或者canal同步数据,发送kafka, 比如canal的样例,虽然after

flink sql cdc发到kafka消息表名信息缺失问题

2021-04-21 文章 casel.chen
最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal server或debezium功能),下游kafka消息需要有 before, after, op_type, ts, database, table 这些字段信息。我试了如下脚本发现出来的kafka消息只有data和op_type信息,其他信息都获取不到。追踪到上游debezium(flink cdc是基于debezium实现的)发出来的record本身就只带data和op_type信息,问一下有没有别的办法获取到变更原始记录呢? CREATE TABLE

flink on k8s作业日志持久化问题

2021-08-22 文章 casel.chen
flink 1.12.1版本,作业通过flink run命令提交,运行在native k8s上,有个问题:作业日志要如何持久化下来?因为发现作业重启后,pod销毁,落在pod本地的日志也看不到了,不知道出错的root cause。有没有办法将作业日志持久化下来?比如存到ELK或阿里云oss上面。另外,我们使用的是阿里云 EKS,是否可以配置作业使用阿里云日志服务呢?我知道k8s应用可以配置ecs宿主机路径映射存储,但这需要修改创建作业的yaml文件,不过我没有看到flink on k8s启动命令有这个选项,求解答,谢谢!

pyflink安装好后启动flink cluster失败

2021-09-04 文章 casel.chen
pip list | grep apache apache-beam2.27.0 apache-flink 1.13.2 apache-flink-libraries 1.13.2 $ python Python 3.7.5 (v3.7.5:5c02a39a0b, Oct 14 2019, 18:49:57) [Clang 6.0 (clang-600.0.57)] on darwin Type "help", "copyright",

flink sql是否支持动态创建sink table?

2021-09-17 文章 casel.chen
上游kafka topic消息带有一个用户类型字段,现在想根据不同用户类型将数据发到不同topic(为了扩展不想写死有哪些类型) ,请问flink sql支持动态创建sink table吗?

flink on native kubernetes作业cpu实际使用量与请求量相差太大问题

2021-09-16 文章 casel.chen
我们使用Flink运行实时作业在Kubernetes,发现作业实际使用的CPU资源远远小于作业请求量,但是将作业请求量降低后发现作业启动不了。请问这是个案还是正常情况? 例如,我们一个作业请求了0.5个cpu,但实际使用量只有0.09左右,修改请求为0.2个cpu,作业启动不了。 现在整个k8s集群有96个cpu,请求了86个cpu,实际使用只有7.5个cpu左右,这也相差太大了,有什么办法可以解决吗?

Flink SQL是否支持Count Window函数?

2021-09-17 文章 casel.chen
今天遇到一个业务场景用到count window,查了下Flink官网目前Flink SQL只支持time window,问一下官方是否打算sql支持count window呢? 如果没有计划的话,自己要如何实现?是否可以像flink 1.13引入的cumulate window写一个自定义窗口函数呢?谢谢!

Flink on native k8s如何自定义挂载盘?

2021-09-17 文章 casel.chen
为了监控TM OOM情况发生,我们在启动作业的时候添加了如下参数 -Denv.java.opts.taskmanager="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/oom.bin" 想在OOM发生的时候能生成HeapDumpFile,以便事后分析。 但是因为OOM时TM所在的pod会被销毁,因此想挂载一个网络盘持久化HeapDumpFile。 请问Flink on native k8s要如何自定义挂载盘呢?使用的Flink版本是1.12.5

Flink SQL官方何时能支持redis和mongodb连接器?

2021-09-17 文章 casel.chen
redis和mongodb经常在工作中用到,但Flink官方一直没有提供这两个标准连接器,想问一下什么时候能正式release方便大家使用呢? ps: behair库已经很久没更新了,对应的flink版本太低。

flink UDF使用jsonpath解析出错

2021-07-14 文章 casel.chen
项目用到了下面依赖,使用flink-shaded-hadoop-2-uber因为使用到了YarnExecutor提交作业 org.apache.flink flink-table-planner-blink_2.12 1.12.1 org.apache.flink flink-shaded-hadoop-2-uber 2.8.3-10.0

Re:Re: Flink SQL向下兼容吗?

2021-08-11 文章 casel.chen
如果只是数据同步作业,例如从kafka消费将数据存入下游db,这种弱“状态”作业能跨版本兼容么? 在 2021-08-11 16:54:56,"Leonard Xu" 写道: >这里的SQL是指DDL还是DML,通常 DML都是兼容的,且一般不会有不兼容的升级, >DDL 语法 各家 SQL 方言都有自己的语法,这个比较灵活,FLINK SQL 的DDL 各个版本稍有不同,但 Flink SQL 新版本都是兼容老的 >DDL的, >只是新版本上的DDL语法如果提供了更丰富的功能,那么老版本的DDL则不能提供 。 >

flink sql聚合后collect收集数据问题

2021-08-11 文章 casel.chen
源表三个字段 name, color, ts 按时间窗口聚合后想根据name group by取colors数组 create table source_table ( name STRING, color STRING, ts TIMESTAMP, WATERMARK ts for ts ) create table sink_table ( name STRING, colors ARRAY ) 1. 请问这个select语句要怎么写? select name, collect(color) as colors from

flink sql统计IP出现次数TopN问题

2021-08-03 文章 casel.chen
场景:实时统计用户访问日志数据,求一分钟内访问事件发生次数超过5次的用户,其不同source_ip出现次数最多前3个的事件 源表数据 user_name, source_ip, ts 张三, 100, 00:08 张三, 104, 00:12 张三, 100, 00:15 张三, 101, 00:35 张三, 100, 00:38 张三, 102, 00:40 张三, 102, 00:45 张三, 101, 00:47 张三, 100, 00:55 张三, 100, 01:15 李四, 200, 01:17 李四, 200, 01:19 李四, 200, 01:27 王五,

  1   2   3   4   >