Flink实时计算平台在k8s上以Application模式启动作业如何实时同步作业状态到平台?

2023-03-21 文章 casel.chen
Flink实时计算平台在k8s上以Application模式启动作业如何实时同步作业状态到平台?作业一旦crash失败就会被k8s回收到相关的pod,没法通过web url去获取作业状态,有什么别的办法吗?通过metrics? 如果是的话具体是哪一个metric值呢?

Re:Re: flink作业保存的状态文件目录在aliyun oss上打不开

2023-03-21 文章 casel.chen
检查过了,当前`state.checkpoints.num-retained`参数值是3 在 2023-03-21 20:05:35,"Shammon FY" 写道: >Hi > >你可以检查一下checkpoint配置`state.checkpoints.num-retained`,是否保存的checkpoint数量太多了? > >Best, >Shammon FY > > >On Tue, Mar 21, 2023 at 11:55 AM casel.chen wrote: > >> 有一个flink

Re: prometheus监控flink作业经常OOM

2023-03-21 文章 Shammon FY
Hi 可以找一些go相关的内存分析工具,看下prometheus进程主要内存使用情况 Best, Shammon FY On Tue, Mar 21, 2023 at 10:16 AM casel.chen wrote: > > 线上用prometheus监控几百个flink作业,使用的是pushgateway方式,设置采样作业metrics周期是30秒,prometheus服务本身给了将近50GB内存,还是会经常发生OOM,请问有什么调优办法吗?

Re: flink作业保存的状态文件目录在aliyun oss上打不开

2023-03-21 文章 Shammon FY
Hi 你可以检查一下checkpoint配置`state.checkpoints.num-retained`,是否保存的checkpoint数量太多了? Best, Shammon FY On Tue, Mar 21, 2023 at 11:55 AM casel.chen wrote: > 有一个flink cdc实现多表关联打宽的flink作业,作业状态达到20GB左右,远端状态存储用的是aliyun >

flink作业保存的状态文件目录在aliyun oss上打不开

2023-03-20 文章 casel.chen
有一个flink cdc实现多表关联打宽的flink作业,作业状态达到20GB左右,远端状态存储用的是aliyun oss。今天作业运行失败打算手动从checkpoint恢复时发现保存作业状态的checkpoint目录(share目录)无法通过浏览器打开,后来使用命令行list了一下该目录下的文件有多达上万个文件。该flink作业用的是rocksdb state backend并开启了增量checkpoint。请问有什么办法可以解决这个问题吗?share目录下这么多文件是因为增量checkpoint遗留下来的吗?

prometheus监控flink作业经常OOM

2023-03-20 文章 casel.chen
线上用prometheus监控几百个flink作业,使用的是pushgateway方式,设置采样作业metrics周期是30秒,prometheus服务本身给了将近50GB内存,还是会经常发生OOM,请问有什么调优办法吗?

Re:Re: 实时数据同步对比监控有什么好的工具和方案吗?

2023-03-18 文章 casel.chen
站在业务角度,监控指标包括数据的一致性(不多不少)和 数据的时效性(同步延迟时长在合理区间)。这2块有什么工具和方案吗? 在 2023-03-17 15:23:30,"Shammon FY" 写道: >Hi > >具体是要监控哪些信息?不同的信息会有不同的工具和方案,比如资源使用率、failover情况、同步数据延时等 > >Best, >Shammon FY > > >On Fri, Mar 17, 2023 at 10:52 AM casel.chen wrote: > >>

Re: 实时数据同步对比监控有什么好的工具和方案吗?

2023-03-17 文章 Shammon FY
Hi 具体是要监控哪些信息?不同的信息会有不同的工具和方案,比如资源使用率、failover情况、同步数据延时等 Best, Shammon FY On Fri, Mar 17, 2023 at 10:52 AM casel.chen wrote: > 业务上利用flink作业做实时数据同步,请问实时数据同步对比监控有什么好的工具和方案吗? > 实时同步链路:mysql -> kafka canal -> flink -> doris > > > 欢迎大家提供思路

Re: 业务库刷数据瞬间cdc流量上涨打爆作业的问题有什么好的解决办法吗?

2023-03-17 文章 yue ma
你好,可以在source下发数据的时候做一些限速么 casel.chen 于2023年3月17日周五 10:53写道: > 使用flink cdc消费mysql binlog遇到业务库刷数据瞬间cdc流量上涨打爆作业的问题有什么好的解决办法吗?

业务库刷数据瞬间cdc流量上涨打爆作业的问题有什么好的解决办法吗?

2023-03-16 文章 casel.chen
使用flink cdc消费mysql binlog遇到业务库刷数据瞬间cdc流量上涨打爆作业的问题有什么好的解决办法吗?

实时数据同步对比监控有什么好的工具和方案吗?

2023-03-16 文章 casel.chen
业务上利用flink作业做实时数据同步,请问实时数据同步对比监控有什么好的工具和方案吗? 实时同步链路:mysql -> kafka canal -> flink -> doris 欢迎大家提供思路

水位线对齐与空闲问题

2023-03-16 文章 haishui
hi, 我在1.15.x和1.16.1对水位线策略进行测试发现水位线对齐和idleness同时使用会造成变成空闲的source无法再消费kafka数据。这是一个bug吗? 我的水位线策略如下: WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofMillis(0)) .withTimestampAssigner((element, recordTimestamp) -> Long.parseLong(element))

Re: Re: 无法设置任务名

2023-03-16 文章 Shammon FY
Hi 控制台具体是指哪块?你可以检查下其他日志是否显示正常 Best, Shammon FY On Wed, Mar 15, 2023 at 11:29 PM wei_yuze wrote: > 我又试了一次,web UI有了,可能是我看漏了。我设置的任务名是XXX_Statistics。 > > > 但是控制台里的输出没有任务名。是log4j2级别不对吗?我在log4j2.properties里设置的输出等级为INFO > > > rootLogger.level = INFO > > > > > > > > 原始邮件 > > > > 发件人:"Weihua Hu"<

Re: 无法设置任务名

2023-03-15 文章 Weihua Hu
Hi, UI 显示的任务名是什么呢? Best, Weihua On Wed, Mar 15, 2023 at 8:02 PM wei_yuze wrote: > 您好! > > > > > 我在使用flink1.16.0。在通过这个方式设置了任务名: > streamExecutionEnvironment.execute("jobName") > 但是web UI 中并不显示出设置的用户名。请问哪位大佬能答疑一下,感谢!

Re: Flink-Sql Watermarkers问题

2023-03-15 文章 ying lin
Flink SQL 现在只能在create table 语句中指定watermark,另外一种迂回的做法,就是参考一下Flink SQL 把Tabe转成流,然后在流上做清洗后再指定watermark

回复:咨询yarn session 集群启动后在不重启的情况下如何更新一个jar包

2023-03-14 文章 17610775726
Hi wdmcode 看上去 per-job 或者 application 模式更适合你,session 模式的话是需要重启集群才能识别到新添加的 connector。 Best JasonLee 回复的原邮件 | 发件人 | wdmcode | | 发送日期 | 2023年03月14日 17:59 | | 收件人 | user-zh | | 主题 | 咨询yarn session 集群启动后在不重启的情况下如何更新一个jar包 | hi all 我在yarn集群使用yarn

Re: 咨询yarn session 集群启动后在不重启的情况下如何更新一个jar包

2023-03-14 文章 Shammon FY
Hi 如果自定义的connector是和作业打包在一起提交的,那可以可以只重启指定作业就可以了;如果这些connector是在flink session集群启动时加载的,一般最好重启集群,避免不同版本connector冲突 Best, Shammon FY On Tue, Mar 14, 2023 at 5:59 PM wdmcode wrote: > > hi all > 我在yarn集群使用yarn session方式启动了一个flink集群。集群中有一些自定义的Connector。自定义的Connector > Jar包放在本地的lib目录。 > 我如何在不重启yarn

咨询yarn session 集群启动后在不重启的情况下如何更新一个jar包

2023-03-14 文章 wdmcode
hi all 我在yarn集群使用yarn session方式启动了一个flink集群。集群中有一些自定义的Connector。自定义的Connector Jar包放在本地的lib目录。 我如何在不重启yarn session集群的情况下更新一个Connector呢。如果重启yarn session集群会导致所有任务都要重启。但是更新一个Connector只会影响部分任务。

Re: flink k8s 部署启动报错

2023-03-13 文章 Weihua Hu
_DIRTY.json 看下以这个结尾的文件,内容应该是一个 json,如果不是标准 json 说明数据已经异常了,可以尝试删除 Best, Weihua On Tue, Mar 14, 2023 at 11:23 AM Jason_H wrote: > 您好, > 我找到了我的ha目录,请教一下,怎么确定哪些数据是脏数据,可以允许删除的,这个有什么办法可以确定吗,我看到的都是些系统数据 > > > | | > Jason_H > | > | > hyb_he...@163.com > | > 回复的原邮件 > | 发件人 | Weihua Hu | >

回复: flink k8s 部署启动报错

2023-03-13 文章 Jason_H
您好, 我找到了我的ha目录,请教一下,怎么确定哪些数据是脏数据,可以允许删除的,这个有什么办法可以确定吗,我看到的都是些系统数据 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Weihua Hu | | 发送日期 | 2023年3月14日 10:39 | | 收件人 | | | 主题 | Re: flink k8s 部署启动报错 | Hi, 看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。 可以参考文档[1],检查相关的

回复: flink k8s 部署启动报错

2023-03-13 文章 Jason_H
您好, 对的,之前是正常启动的,突然失败了,然后我直接重启pod,就一直报这个错了。 | | Jason_H | | hyb_he...@163.com | 回复的原邮件 | 发件人 | Weihua Hu | | 发送日期 | 2023年3月14日 10:39 | | 收件人 | | | 主题 | Re: flink k8s 部署启动报错 | Hi, 看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。 可以参考文档[1],检查相关的 HA 路径,清理下异常数据

Re: flink k8s 部署启动报错

2023-03-13 文章 Weihua Hu
Hi, 看异常信息是 Flink 集群在启动时检索到 HA 路径上存在 DirtyResults 数据,但是数据已经不完整了,无法正常读取。 可以参考文档[1],检查相关的 HA 路径,清理下异常数据 另外问一下,之前是通过同名的 cluster-id 启动过 Flink 集群吗? [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#job-result-store-storage-path Best, Weihua On Tue, Mar 14, 2023 at

Re:Re: Re: Re: Re: flink on yarn 异常停电问题咨询

2023-03-13 文章 guanyq
我昨天模拟下断电的情况 10个ha文件的日期是错开的5秒一个 chk-xxx也不是都损坏了,有的是可以启动的,这个我也试了 现在情况是 yarn集群停电重启首先会循环尝试从10个ha的文件中启动应用,ha文件记录的chk的相关原数据 1.如果ha文件都损坏了,即使chk没有损坏,flink应用也是拉不起来的 现在想的是让hdfs上存在至少1组个可用的的ha文件及其对应的chk 现在是5秒一个chk,保存了10个,也会出现损坏无法启动的问题 5秒*10 = 50秒,也想知道多长时间的存档才能保证存在一组没有损坏ha和chk呢。 在 2023-03-14

Re: Re: Re: Re: flink on yarn 异常停电问题咨询

2023-03-13 文章 Guojun Li
Hi 确认一下这些 ha 文件的 last modification time 是一致的还是错开的? 另外,指定 chk- 恢复尝试了没有?可以恢复吗? Best, Guojun On Fri, Mar 10, 2023 at 11:56 AM guanyq wrote: > flink ha路径为 /tmp/flink/ha/ > flink chk路径为 /tmp/flink/checkpoint > > > 我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。 > > > > > 会尝试从10个chk恢复,日志有打印 >

flink k8s 部署启动报错

2023-03-13 文章 Jason_H
hi,大家好 请教一个问题,我在k8s上部署的flink集群,启动不来,报如下的错误,大家有遇到过吗 java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown

Re: flink avro schema 升级变动,job如何平滑过渡

2023-03-13 文章 Shammon FY
Hi 从错误上看应该是schema跟数据不匹配导致导致的,看起来目前avro不支持这种schema变更新老数据一起处理 Best, Shammon.FY On Fri, Mar 10, 2023 at 2:29 PM Peihui He wrote: > java.io.IOException: Failed to deserialize Avro record. > at > >

回复: Flink-Sql Watermarkers问题

2023-03-13 文章 吴先生
好的感谢,我关注下 | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年3月13日 18:49 | | 收件人 | | | 主题 | Re: Flink-Sql Watermarkers问题 | Hi 目前sql只能在create table时指定,不过有新的扩展功能,相关FLIP正在讨论中,你可以关注一下

Re: GenericRowData与BinaryRowData的转换

2023-03-13 文章 Benchao Li
Hi zilong, 应该是没有内置的方法直接进行转换的,如果有需要,还是需要自己根据schema做一遍读取和写入。 另外,在FLINK-24403[1] 中加强了对于复杂类型的print能力,可以直接把他们cast成string来打印。 [1] https://issues.apache.org/jira/browse/FLINK-24403 zilong xiao 于2023年3月13日周一 16:22写道: > hi, benchao, 想问下有什么办法可以将BinaryRowData转成GenericRowData吗?我们业务场景需要对RowData >

Re: GenericRowData与BinaryRowData的转换

2023-03-13 文章 Shammon FY
Hi 你可以考虑将field数据从BinaryRowData中读取出来再转换成string试试 Best, Shammon.FY On Mon, Mar 13, 2023 at 4:21 PM zilong xiao wrote: > hi, benchao, 想问下有什么办法可以将BinaryRowData转成GenericRowData吗?我们业务场景需要对RowData > toString,BinaryRowData没有实现该方法QQAQ > > Benchao Li 于2021年4月9日周五 10:42写道: > > >

Re: Flink-Sql Watermarkers问题

2023-03-13 文章 Shammon FY
Hi 目前sql只能在create table时指定,不过有新的扩展功能,相关FLIP正在讨论中,你可以关注一下 https://cwiki.apache.org/confluence/display/FLINK/FLIP-296%3A+Extend+watermark-related+features+for+SQL Best, Shammon.FY On Mon, Mar 13, 2023 at 6:29 PM 吴先生 <15951914...@163.com> wrote: > hi, > 我在使用Flink-Sql 1.14版本时能否不在create

Flink-Sql Watermarkers问题

2023-03-13 文章 吴先生
hi, 我在使用Flink-Sql 1.14版本时能否不在create table处指定watermarkers,因为源数据需要做一些清洗之后再指定水位线 | | 吴先生 | | 15951914...@163.com |

Re:Re: Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 文章 guanyq
理解了,非常感谢。 在 2023-03-13 16:57:18,"Weihua Hu" 写道: >图片看不到,可以找一个图床上传图片,在邮件列表中贴一下链接。 > >YARN 拉起 AM 还受 "yarn.application-attempt-failures-validity-interval"[1] >控制,在这个时间内达到指定次数才会退出。 > >[1]

Re: Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 文章 Weihua Hu
图片看不到,可以找一个图床上传图片,在邮件列表中贴一下链接。 YARN 拉起 AM 还受 "yarn.application-attempt-failures-validity-interval"[1] 控制,在这个时间内达到指定次数才会退出。 [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#yarn-application-attempt-failures-validity-interval Best, Weihua On Mon, Mar 13, 2023 at

Re:Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 文章 guanyq
图片在附件 但是实际却是超过了10次。。 在 2023-03-13 15:39:39,"Weihua Hu" 写道: >Hi, > >图片看不到了 > >按照这个配置,YARN 应该只会拉起 10 次 JobManager。 > >Best, >Weihua > > >On Mon, Mar 13, 2023 at 3:32 PM guanyq wrote: > >> flink1.10版本,flink配置如下 >> yarn.application-attempts = 10 (yarn尝试启动flink job的次数为10) >>

Re: GenericRowData与BinaryRowData的转换

2023-03-13 文章 zilong xiao
hi, benchao, 想问下有什么办法可以将BinaryRowData转成GenericRowData吗?我们业务场景需要对RowData toString,BinaryRowData没有实现该方法QQAQ Benchao Li 于2021年4月9日周五 10:42写道: > GenericRowData和BinaryRowData都是RowData这个接口的具体实现。 > 所以你只需要针对RowData进行编程即可,不能假设它使用哪个具体实现。 > > 关于你的问题,在算子之间数据计算和转换的时候,会有很多地方构造出来BinaryRowData, >

Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 文章 Weihua Hu
Hi, 图片看不到了 按照这个配置,YARN 应该只会拉起 10 次 JobManager。 Best, Weihua On Mon, Mar 13, 2023 at 3:32 PM guanyq wrote: > flink1.10版本,flink配置如下 > yarn.application-attempts = 10 (yarn尝试启动flink job的次数为10) > 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图 >

flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 文章 guanyq
flink1.10版本,flink配置如下 yarn.application-attempts = 10 (yarn尝试启动flink job的次数为10) 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图 请问appattempt_1678102326043_0006_000409每个序号不是代表一次尝试么

flink sql多条cdc数据流实时regular join如何减少作业状态?

2023-03-11 文章 casel.chen
当前flink实时作业接的kafka canal json格式的cdc数据,mysql表会有新增和更新数据,但不会有物理删除。 如果直接多条cdc数据流实时关联会导致作业状态很大,请教: 1. 有没有什么办法可以减少作业状态? 2. cdc格式的retract流可以加去重变成append流吗? 3. 使用append流多流关联是不是能减少作业状态?

Re: flink avro schema 升级变动,job如何平滑过渡

2023-03-09 文章 Peihui He
java.io.IOException: Failed to deserialize Avro record. at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)

Re:Re: Re: Re: flink on yarn 异常停电问题咨询

2023-03-09 文章 guanyq
flink ha路径为 /tmp/flink/ha/ flink chk路径为 /tmp/flink/checkpoint 我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。 会尝试从10个chk恢复,日志有打印 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Recovering checkpoints from ZooKeeper.

Re: Re: Re: flink on yarn 异常停电问题咨询

2023-03-09 文章 Weihua Hu
Hi 一般来说只是 YARN 集群异常停电不会影响已经完成的历史 Checkpoint(最后一次 Checkpoint 可能会写 hdfs 异常) 有更详细的 JobManager 日志吗?可以先确认下 Flink 在恢复时检索到了多少个 completedCheckpoint 以及最终尝试从哪一次 cp 恢复的。 也可以尝试按照 Yanfei 所说指定历史的 cp 作为 savepoint 恢复 Best, Weihua On Fri, Mar 10, 2023 at 10:38 AM guanyq wrote: > 没有开启增量chk >

Re:Re: Re: flink on yarn 异常停电问题咨询

2023-03-09 文章 guanyq
没有开启增量chk 文件损坏是看了启动日志,启动日志尝试从10个chk启动,但是都因为以下块损坏启动失败了 错误日志为: java.io.IOException: Got error, status message opReadBlock BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278 received exception org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException: The meta file length

Re: Re: flink on yarn 异常停电问题咨询

2023-03-09 文章 Yanfei Lei
Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定 chk重启[1]。 > flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。 请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。 >

Re:Re: flink on yarn 异常停电问题咨询

2023-03-09 文章 guanyq
目前也想着用savepoint处理异常停电的问题 但是我这面还有个疑问: flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。 就很奇怪是不是10个checkpoint都没落盘导致的。 想问下: checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。 在 2023-03-10 08:47:11,"Shammon FY" 写道: >Hi > >我觉得Flink

Re: flink on yarn 异常停电问题咨询

2023-03-09 文章 Shammon FY
Hi 我觉得Flink 作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业 Best, Shammon On Thu, Mar 9, 2023 at 10:06 PM guanyq wrote: > 前提 > 1.flink配置了高可用 > 2.flink配置checkpoint数为10 > 3.yarn集群配置了任务恢复 > 疑问 > yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动

Re: flink问题咨询

2023-03-09 文章 Shammon FY
Hi 我个人觉得可以将你现在的process计算分为两部分,你提到每隔20s触发的delta计算部分放到stream2部分,类似于这种形式 stream1.keyBy().connect(stream2.keyBy().process(处理增量,每20秒触发输出)).process(根据增量更新ListState) 这样不需要从ListState中去查找哪些数据被更新了 Best, Shammon On Thu, Mar 9, 2023 at 10:48 AM 陈隽尧 wrote: > 您好, > > >

flink on yarn 异常停电问题咨询

2023-03-09 文章 guanyq
前提 1.flink配置了高可用 2.flink配置checkpoint数为10 3.yarn集群配置了任务恢复 疑问 yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动

flink问题咨询

2023-03-08 文章 陈隽尧
您好, 我是flink一名新用户,最近在项目中需要用到flink完成一项业务功能,但目前遇到了一些一些困难,想咨询你一下是否有合适的解决方案,期待您的回信 问题背景:我们需要基于股票交易流水和股票行情去计算股票账户层面的一些指标(为简化场景,假定账户指标只有持仓量,买入均价,市值),页面前端20s刷新一次,指标计算想基于flink的dataStream Api实现,但遇到一个问题,目前初步想法如下,请flink大神帮忙指导 初步方案设想:假定stream1: 股票交易流水, stream2:股票行情流水

Re: Re:Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-08 文章 Jane Chan
从 plan 上看起来在 sink 节点这里因为推导不出 upsert key 加上了 SinkUpsertMaterializer[1], 这里会按照 sink 表定义的主键进行 keyby shuffle[2], 只能保证最终一致性. 另外你的操作描述中 schema 为三列, 但 DDL 是四列, 且格式乱了. 一些可能的建议如下 1. 如果上游数据有主键并且也是 rowid 的话, 建议在 Flink source 表上声明 PK, 避免额外生成 materializer 节点; 同时注意在声明 Flink source 表时不要带上 metadata 列 (比如 op),

Re: Flink异步Hbase导致Too many open files异常

2023-03-08 文章 Ran Tao
+1 有遇到过类似 fd 泄露的问题。注意 close 的时候buffer 数据刷盘, 然后资源关闭,future cancel。 Best Regards, Ran Tao Weihua Hu 于2023年3月8日周三 16:52写道: > Hi, > > 通过代码看作业在Failover 时的确会有 HBaseClient 的资源泄露。 > > 在 HbaseDimensionAsyncFunc 中重写一下 close 方法,释放掉 HBaseClient。 > > Best, > Weihua > > > On Wed, Mar 8, 2023 at 4:19 PM

Re: Flink异步Hbase导致Too many open files异常

2023-03-08 文章 Weihua Hu
Hi, 通过代码看作业在Failover 时的确会有 HBaseClient 的资源泄露。 在 HbaseDimensionAsyncFunc 中重写一下 close 方法,释放掉 HBaseClient。 Best, Weihua On Wed, Mar 8, 2023 at 4:19 PM aiden <18765295...@163.com> wrote: > Hi > 我在使用Async Hbase时频繁遇到too many open file异常,程序自动重启后会立即报错,具体报错日志如下: > 2023-03-08 16:15:39 >

Re: Re: flink on K8S(operator) 如何获取 Accumulator

2023-03-07 文章 Shammon FY
Hi 像上面提到的,jobClient.get().getAccumulators()会从flink集群获取作业相关信息,如果是application模式,作业结束后flink集群也会退出。你可以通过其他方式,包括session集群运行或者启动history server等方式,也可以通过自定义metrics等输出到其他系统 Best, Shammon On Tue, Mar 7, 2023 at 11:27 PM 李银苗 wrote: > 退订

Re:Re: flink on K8S(operator) 如何获取 Accumulator

2023-03-07 文章 李银苗
退订

Re: flink on K8S(operator) 如何获取 Accumulator

2023-03-06 文章 Weihua Hu
Hi, 按照你的描述,我猜测你使用的是 Application 模式吧?这种模式下 user code 会在 JobManager 侧执行,Job 执行结束后会直接 shutdown cluster。 可以尝试使用 session mode[1] 部署 cluster [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode Best, Weihua On Mon, Mar 6, 2023

FlinkCEP GroupPattern 匹配结果异常问题

2023-03-06 文章 傅宣登
各位好, 我发现 FlinkCEP 中涉及 GroupPattern 的一些查询的匹配结果和直觉不太符合。 以如下查询为例: Pattern.begin("pl").where( new SimpleCondition() { @Override public boolean filter(Event value) { return value.getName() == 2; } }

Re: Flink作业tm Connection timed out异常问题

2023-03-06 文章 Shammon FY
Hi 很多原因都可能会导致连接失败问题,包括机器故障、系统问题或者服务器负载,如果是怀疑负载问题你可以找几台服务器和这台有疑问的服务器组成个小集群,提交一些作业,让这台服务器负载不要太高,观察一下作业运行情况 Best, Shammon On Mon, Mar 6, 2023 at 8:49 PM crazy <2463829...@qq.com.invalid> wrote: > 报错日志下面这个一样,是同一个问题么 > https://issues.apache.org/jira/browse/FLINK-19925 > > > 其中描述到服务器 "high cpu usage

Re: flinkSQL无法实现两个timestamp(3) 相减

2023-03-06 文章 Shammon FY
Hi 如果没有现成的系统函数,你可以写个自定义udf来实现 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/ Best, Shammon On Mon, Mar 6, 2023 at 7:46 PM 唐世伟 wrote: > > 我们需要对两个时间相减,精度为毫秒。但是无论是TIMESTAMPDIFF函数,还是先转成UNIX_TIMESTAMP,都只支持秒的精度。请问还有其他方法吗?

flink cdc connector计划支持hudi change data capture吗?

2023-03-06 文章 casel.chen
flink cdc connector计划支持hudi change data capture吗?

flink on K8S(operator) 如何获取 Accumulator

2023-03-06 文章 wangwei
Hi,大佬们 如何在任务结束后获取Accumulator 数据? 参考代码:(但是无法获取) ableResult execute = statementSet.execute(); Optional jobClient = execute.getJobClient(); jobClient.get().getAccumulators().get() PS: 最初的需求是: 对任务同步的数据量做统计。希望在批任务结束后,准确的获取Accumulator 中值,但是在K8S 中无法获取? 大佬求助!!先磕为敬

flinkSQL无法实现两个timestamp(3) 相减

2023-03-06 文章 唐世伟
我们需要对两个时间相减,精度为毫秒。但是无论是TIMESTAMPDIFF函数,还是先转成UNIX_TIMESTAMP,都只支持秒的精度。请问还有其他方法吗?

Re: Flink作业tm Connection timed out异常问题

2023-03-05 文章 Yuxin Tan
不建议这样做,因为这样会掩盖问题。 但如果一定要配置"重试次数"或"超时时长" 这些参数,会涉及到很多参数,比如 akka.tcp.timeout, taskmanager.network.netty.client.connectTimeoutSec, taskmanager.network.retries等等,具体可以参考[1]。 [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/ Best, Yuxin crazy

Re: Flink作业tm Connection timed out异常问题

2023-03-05 文章 Yuxin Tan
"如果进程没被调度到这台机器上,任务正常",从给出的描述来看,确实很可能是 A 这台机器有问题。 可以检查机器 A 的网络、内存、CPU 指标或者监控是否正常,与其他机器是否存在不同。比如网络参数的配置、机器内存是否存在损坏、机器是否存在异常进程或负载等等。 如果硬件问题,系统日志有可能有一些报错。也可以使用一些机器检查工具, dmesg/vmstat等。 Best, Yuxin crazy <2463829...@qq.com.invalid> 于2023年3月6日周一 14:23写道: > 各位大佬好,有个线上作业频繁failover,异常日志如下: > >

Flink????tm Connection timed out????????

2023-03-05 文章 crazy
failover 2023-03-05 11:41:07,847 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Process (287/300) (b3ef27fec49fe3777f830802ef3501e9) switched from RUNNING to FAILED on container_e26_1646120234560_82135_01_97 @

Re:Re:Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-05 文章 陈佳豪
刚做了一下测试 目前假定有3行数据需要同步(全量): | 编号 | 电话 | 座机 | | 1 | 1311313 | 123 | | 2 | 1311313 | 456 | | 3 | 1311313 | 789 | 这个时候我修改第四行数据的两个字段(增量): | 1 | 电话 | 座机 | | 1 | 1311313 | 123 | | 2 | 1311313 | 456 | | 3 | 13113133110 | 888 | 修改完后我删除字段2这个时候去mysql看结果2是正确被删除,且无新增的(操作正确).

Re:Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-05 文章 陈佳豪
hi 早上好 我将flink升级到了1.16.1的版本去执行kafka同步到mysql的任务,发现还是存在一样的问题,我本机执行了explain的执行过程给的输出如下 == Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称, 手机, 座机]) +- LogicalProject(rowID=[CAST($0):VARCHAR(255) CHARACTER SET "UTF-16LE"], 名称=[$1],

Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-04 文章 Jane Chan
Hi, 抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在 1.16.1 上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan 打印出来看看. [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/ 祝好! Jane On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪

Re:Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-04 文章 陈佳豪
hi 你好 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的 在 2023-03-02 11:52:41,"Jane Chan" 写道: >Hi, > >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个 >query 在 1.16.2 上验证没有问题 > >[1]

Re: flink sql

2023-03-03 文章 小昌同学
好滴 谢谢大佬呀 | | 小昌同学 | | ccc0606fight...@163.com | Replied Message | From | 17610775726<17610775...@163.com> | | Date | 3/3/2023 15:55 | | To | user-zh@flink.apache.org | | Cc | user-zh | | Subject | Re:flink sql | Hi 可以通过设置 pipeline.operator-chaining = false 来实现。 Best JasonLee

Re:flink sql

2023-03-02 文章 17610775726
Hi 可以通过设置 pipeline.operator-chaining = false 来实现。 Best JasonLee Replied Message | From | 小昌同学 | | Date | 03/3/2023 15:50 | | To | user-zh | | Subject | flink sql | 各位大佬,请教一下如何使用flink sql实现DataStreaming的disableOperatorChaining功能 | | 小昌同学 | | ccc0606fight...@163.com |

flink sql

2023-03-02 文章 小昌同学
各位大佬,请教一下如何使用flink sql实现DataStreaming的disableOperatorChaining功能 | | 小昌同学 | | ccc0606fight...@163.com |

回复: Flink内存问题

2023-03-02 文章 吴先生
感谢,我看下 | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Weihua Hu | | 发送日期 | 2023年3月3日 10:37 | | 收件人 | | | 主题 | Re: Flink内存问题 | Hi, 针对问题 2, 可以增加下列环境变量来排除 Glibc 的问题,详情可以参考[1] containerized.master.env.MALLOC_ARENA_MAX: 1 containerized.taskmanager.env.MALLOC_ARENA_MAX: 1 [1]

Re:flink sql jdbc connector是否支持多流拼接?

2023-03-02 文章 程龙
这种情况下有两种方式可以处理 1> 注册表-使用join方式直接拼接成大宽表写入 2> 每个任务-直接写入下游数据 ,每个任务只更新自己的字段即可(因为主键相同) 在 2023-03-02 20:59:59,"casel.chen" 写道: >flink sql jdbc connector是否支持多流拼接? >业务上存在基于主键打宽场景,即多个流有相同的主键字段,除此之前其他字段没有重叠,现在想打成一张大宽表写入mysql/oracle这些关系数据库。 >每条流更新大宽表的一部分字段。

Re: flink sql接cdc数据源关联维表写入下游数据库发现漏数据

2023-03-02 文章 Shengkai Fang
听上去像是数据乱序了。可以看看这个文档对应的解决下[1] [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/determinism/ Best, Shengkai casel.chen 于2023年3月1日周三 16:18写道: > flink sql上游接kafka canal json topic消费mysql同步过来的变更,接着关联几张维表,最后写入下游数据库发现漏数据。 > >

Re: Flink内存问题

2023-03-02 文章 Weihua Hu
Hi, 针对问题 2, 可以增加下列环境变量来排除 Glibc 的问题,详情可以参考[1] containerized.master.env.MALLOC_ARENA_MAX: 1 containerized.taskmanager.env.MALLOC_ARENA_MAX: 1 [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_trouble/ Best, Weihua On Thu, Mar 2, 2023 at 8:10 PM 吴先生

Re: flink sql jdbc connector是否支持多流拼接?

2023-03-02 文章 Shengkai Fang
hi. 手动使用 join 将多个流拼接起来? Best, Shengkai casel.chen 于2023年3月2日周四 21:01写道: > flink sql jdbc connector是否支持多流拼接? > 业务上存在基于主键打宽场景,即多个流有相同的主键字段,除此之前其他字段没有重叠,现在想打成一张大宽表写入mysql/oracle这些关系数据库。 > 每条流更新大宽表的一部分字段。

Re: 退订

2023-03-02 文章 Vincent Woo
Hi,退订请发送邮件到 user-zh-unsubscr...@flink.apache.org Best, Vincent Woo > 2023年3月3日 08:41,zhangjunjie 写道: > > 退订 > >

退订

2023-03-02 文章 zhangjunjie
退订

flink sql jdbc connector是否支持多流拼接?

2023-03-02 文章 casel.chen
flink sql jdbc connector是否支持多流拼接? 业务上存在基于主键打宽场景,即多个流有相同的主键字段,除此之前其他字段没有重叠,现在想打成一张大宽表写入mysql/oracle这些关系数据库。 每条流更新大宽表的一部分字段。

回复: Flink内存问题

2023-03-02 文章 吴先生
Hi, 目前分析问题应该在堆外,大概率是managed和overhead这两部分,这两部分的内存分配比例都是默认配置,通过网上的相关资料来看有两种解决方案: 1、调大managed和overhead这两块的内存比例, 问题:调整多大合适?是否调整之后还会持续增长 2、还有另一种说法是glibc内存分配器有个64M的问题引起(这里可有深入研究),替换为jemalloc可避免 问题:有具体的知道方案吗 | | 吴先生 | | 15951914...@163.com | 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年3月2日

Re: Flink内存问题

2023-03-02 文章 Shammon FY
Hi 如果有搜集metrics,可以根据metrics查看一下是哪部分内存上涨导致container被kill掉;然后将上涨比较快的container内存dump一下,查看具体是哪些对象占用内存比较多 Best, Shammon On Thu, Mar 2, 2023 at 7:14 PM 吴先生 <15951914...@163.com> wrote: > Hi, > Flink版本:1.12 > 部署模式:on yarn per-job > 开发方式:DataStream Api > 状态后端:RocksDB >

Flink内存问题

2023-03-02 文章 吴先生
Hi, Flink版本:1.12 部署模式:on yarn per-job 开发方式:DataStream Api 状态后端:RocksDB Job逻辑为一个15分钟的窗口计算,任务在运行一段时间后会出现内存使用超限,container被yarn kill的现象,目前有不少任务都会存在类似问题: Closing TaskExecutor connection container_e02_1654567136606_1034_01_12 because: [2023-03-02 08:12:44.794]Container

Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-01 文章 Jane Chan
Hi, 可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个 query 在 1.16.2 上验证没有问题 [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/ Best, Jane On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 wrote: > flink ,kafka连接 jdbc连接版本都是1.15.2的 > > > > > > >

Re: Re: Flink SQL 如何优化以及处理反压

2023-03-01 文章 Guojun Li
可以看一下反压算子是否出现在同一台机器(排除单点故障)。比如使用了 rocksdb + hdd 盘;单机负载过高;磁盘打满等。 如果不是单点故障,可以打 jstack 查看对应的线程具体在执行什么样的操作,再进行相应的逻辑优化。 On Tue, Jan 31, 2023 at 6:01 PM lxk wrote: > 现在从web ui上看,瓶颈主要在于group by 聚合函数之后去重这个逻辑。 > 而且SQL这个并行度是全局设置的,没法针对某一个特定的算子设置并行度,并行度多了之后,资源又感觉有点吃紧。 > > > > > > > > > > > > > > > > > > 在

Re:使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-01 文章 陈佳豪
flink ,kafka连接 jdbc连接版本都是1.15.2的 在 2023-03-01 18:14:35,"陈佳豪" 写道: >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。 >String kafka = "CREATE TABLE `电话` (`rowid` >VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` >VARCHAR(2147483647),`63fd660536521f81a2cfabad`

使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-01 文章 陈佳豪
问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。 String kafka = "CREATE TABLE `电话` (`rowid` VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` VARCHAR(2147483647),`63fd660536521f81a2cfabad` VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535) ) WITH ( 'connector' = 'kafka', 'topic' =

metrics.latency.interval指标应该如何查看?

2023-03-01 文章 陶浩然
我使用的flink版本是1.14.0,在flink-conf.yaml里添加了latency的配置 但是我在web-ui中没有找到这个指标 请问下是哪里出问题了。 flink任务是从kafka中读数据写入mysql中 public class FlinkSqlTask { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env =

flink sql接cdc数据源关联维表写入下游数据库发现漏数据

2023-03-01 文章 casel.chen
flink sql上游接kafka canal json topic消费mysql同步过来的变更,接着关联几张维表,最后写入下游数据库发现漏数据。 随后在写目标数据库中加了一些日志后发现同一主键的变更记录(前后发生间隔时间很短)被发送到了不同的TaskManager处理,导致新数据被旧数据覆盖,造成漏数据现象。 请问: 1. cdc数据源关联维表后会被分散到不同TaskManager吗?什么情况下会发生? 2. 如何解决这个问题?是需要在写目标表之前加一层窗口去重[1]吗? [1]

退订

2023-02-28 文章 704669594
退订

Re: managed memory占用100%的问题

2023-02-28 文章 Shammon FY
Hi 根据邮件里的异常信息看了下代码,这里的RecordArea会从managed memory申请内存分片 你可以根据作业流量尝试调整下窗口大小或者内存分配 Best, Shammon On Tue, Feb 28, 2023 at 6:47 PM Junrui Lee wrote: > Hi, > > 图片挂掉了,能不能直接用文字描述配置文件? > > Best, > Junrui > > 生于八十年代 <623730...@qq.com.invalid> 于2023年2月28日周二 18:31写道: > > > 社区的各位大佬们有个问题咨询一下大家: > > 1. > >

Re: managed memory占用100%的问题

2023-02-28 文章 Junrui Lee
Hi, 图片挂掉了,能不能直接用文字描述配置文件? Best, Junrui 生于八十年代 <623730...@qq.com.invalid> 于2023年2月28日周二 18:31写道: > 社区的各位大佬们有个问题咨询一下大家: > 1. > 问题背景:我们在使用flink读取消费kafka中的hdfs路径消息,然后从hdfs中读取文件,做完处理后写入hive,整个过程都是以流式的过程完成,而不是批处理的过程; > 目前遇到的问题是任务运行一段时间之后,kafka就开始出现hdfs路径消息积压,目前发现managed >

managed memory????100%??????

2023-02-28 文章 ????????????
??1. flinkkafkahdfshdfs??hive ??kafka??hdfs??managed

Re:Flink Kafka Sink时间戳异常

2023-02-27 文章 haishui
hi, 这个问题是因为经过窗口算子后StreamRecord中指定的时间时间戳被改成了window.maxTimestamp(),可以查看[1]中WindowOperator或EvictingWindowOperator中的emitWindowContents方法。 如果想要更改时间戳,可以实现一个ProcessFuncton TimestampedCollector collector = (TimestampedCollector) out; collector.setAbsoluteTimestamp( ); collector.collect(value);

Re: 退订

2023-02-27 文章 Weihua Hu
退订请发送邮件到 user-zh-unsubscr...@flink.apache.org Best, Weihua On Tue, Feb 28, 2023 at 12:13 AM zhangjunjie wrote: > 退订 > > >

退订

2023-02-27 文章 zhangjunjie
退订

Re: 【Windowing TVF】 GROUP BY window_start, window_end 没有输出

2023-02-27 文章 Weihua Hu
Hi, 可以详细描述下你的使用 case 吗?用的 SQL 语句是什么样子的 Best, Weihua On Mon, Feb 27, 2023 at 12:51 PM wei_yuze wrote: > 您好! > > > > > 我在使用Windowing table-valued functions (Windowing TVFs) 的时候,GROUP BY 中一旦加上 > window_start, window_end 就没有输出,但也不报错。请问有哪位大佬知道是什么原因吗? > > Lucas

退订

2023-02-26 文章 jiafu

????

2023-02-25 文章 zhangjunjie
| ?? | fei<704669...@qq.com.INVALID> | | | 2023??02??26?? 09:02 | | ?? | user-zh | | ?? | | | | |

Re:

2023-02-25 文章 Jane Chan
退订请发送邮件至 user-zh-unsubscr...@flink.apache.org Best, Jane On Fri, Feb 24, 2023 at 7:43 PM LITA LITA wrote: > 退订 > > <704669...@qq.com.invalid> 于2023年2月24日周五 07:58写道: > > > 退订 > > > > >

Re: 使用flink sql 将kafka的数据同步到mysql无法删除。

2023-02-25 文章 Jane Chan
Hi, 原问题中 String 变量 kafka 和 mysql 赋值反了, 以及能提供下所使用的 flink 版本吗, 我使用 1.16.1 没有复现此问题 payload { "before": { "rowid": "f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d", "63f73b332e77497da91286f0": "Jerry", "63f73b3f2e77497da91286fb": "mobile number", "63f73b3f2e77497da91286fc": "telephone

Re:

2023-02-24 文章 LITA LITA
退订 <704669...@qq.com.invalid> 于2023年2月24日周五 07:58写道: > 退订 > >

<    6   7   8   9   10   11   12   13   14   15   >