回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 文章
我们想修改源码 实现任意任务提交实时平台,初始化DAG的时候获取到血缘信息,StreamExecutionEnvironment注册 这种只能写在任务里 不满足需求 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2024年03月8日 18:23,Zhanghao Chen 写道: 你可以看下 OpenLineage 和 Flink 的集成方法 [1],它是在 StreamExecutionEnvironment 里注册了一个 JobListener(通过这个可以拿到 JobClient 进而拿到 job id)。然后从

回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 文章
”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在 SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid, JobGraph可以拿到source和sink的链接信息和flinkJobid? | | 阿华田 | | a15733178...@163.com | JobGraph 可以获得 transformation 信息transformation 签名由网易邮箱大师定制 在2024年03月8

回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-07 文章
获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2024年02月26日 20:04,Feng Jin 写道: 通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris Sink,之后再通过反射获取里面的 properties 信息进行提取。 可以参考 OpenLineage[1] 的实现. 1. https://github.com

回复: Flink任务链接信息审计获取

2024-02-03 文章
看了一下 这样需要每个任务都配置listener,做不到系统级的控制,推动下游用户都去配置listener比较困难 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2024年02月2日 19:38,Feng Jin 写道: hi, 可以参考下 OpenLineage[1] 的实现, 通过 Flink 配置JobListener 拿到 Transformation 信息,然后解析 Source 和 Sink 拿到血缘信息。 [1] https://github.com/OpenLineage/OpenLineage/blob

回复: Flink任务链接信息审计获取

2024-02-03 文章
好的 感谢 我研究一下 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2024年02月2日 19:38,Feng Jin 写道: hi, 可以参考下 OpenLineage[1] 的实现, 通过 Flink 配置JobListener 拿到 Transformation 信息,然后解析 Source 和 Sink 拿到血缘信息。 [1] https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/src/main/java/io

Flink任务链接信息审计获取

2024-02-02 文章
打算做flink任务画像的事情,主要是用户跑的datastream作业,在我们的实时平台运行起来之后希望能审计到使用了哪些kafka的topic,写入了哪些中间件(mysql,hbase ,ck 等等),大佬们有什么好的方式吗,目前flinksql可以通过sql获取到,用户自己编译的flink任务包去执行datastream作业获取不到 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

回复: flink ui 算子数据展示一直loading...

2024-01-25 文章
这个维度都排查了 都正常 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2024年01月23日 21:57,Feng Jin 写道: 可以尝试着下面几种方式确认下原因: 1. 打开浏览器开发者模式,看是否因为请求某个接口卡住 2. 查看下 JobManager 的 GC 情况,是否频繁 FullGC 3. 查看下 JobManager 的日志,是否存在某些资源文件丢失或者磁盘异常情况导致 web UI 无法访问. Best, Feng On Tue, Jan 23, 2024 at 6:16 PM 阿华田

flink ui 算子数据展示一直loading...

2024-01-23 文章
如下图,任务处理数据正常,任务状态也正常,但是flink_ui一致处于loading中,只有个别任务这样,其他正常,有可能是metirc包的某个类冲突导致的吗? | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

kafka_appender收集flink任务日志连接数过多问题

2023-10-16 文章
Flink集群使用kafka_appender收集flink产生的日志,但是现在实时运行的任务超过了三千个,运行的yarn-container有20万+。导致存储日志的kafka集群连接数过多,kafka集群压力有点大,请教各位大佬flink日志采集这块还有什么好的方式吗 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

如何获取flink任务 source 和sink的所链接的中间件的ip信息

2023-09-22 文章
本人目前再做实时计算平台,想审计平台上所有运行flink任务的中间件的连接信息。比如job1 是kafka写入hbase的flink任务,希望可以自动审计此任务所连接的kafka的集群ip地址和topic以及hbase的集群ip地址和表信息 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

回复: flink-job-history 任务太多页面卡死

2023-07-28 文章
这个解决不了根本问题 主要是我们的任务比较多,业务上就需要保留几千个任务 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2023年07月28日 11:28,Shammon FY 写道: Hi, 可以通过配置`jobstore.max-capacity`和`jobstore.expiration-time`控制保存的任务数,具体参数可以参考[1] [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full

flink-job-history 任务太多页面卡死

2023-07-27 文章
目前flink-job-history 已经收录5000+任务,当点击全部任务查看时,job-history就会卡死无法访问,各位大佬有什么好的解决方式? | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

回复:flink on k8s 任务状态监控问题

2023-07-16 文章
history server是不是有延迟性 做不到实时获取任务的状态 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2023年07月15日 12:14,casel.chen 写道: 可以查看history server 在 2023-07-14 18:36:42,"阿华田" 写道: hello 各位大佬, flink on K8S ,运行模式是native 模式,任务状态实时监控遇到一个问题: 就是pod退出了 无法判断flink任务是正常Finished 还是异常失败

flink on k8s 任务状态监控问题

2023-07-14 文章
hello 各位大佬, flink on K8S ,运行模式是native 模式,任务状态实时监控遇到一个问题: 就是pod退出了 无法判断flink任务是正常Finished 还是异常失败了,各位大佬有什么建议吗 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

flinksql如何设置es 字段属性

2021-07-17 文章
使用flinksql写入es是 index是动态的,所有想使用flinksql直接设置字段的属性,比如not_analyzed。目前发现好像只能设置字段的类型,大佬们有知道怎么设置字段的属性吗 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

自定义带有状态的udf

2021-06-01 文章
initializeStateis started "); } | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

如何自定义带有状态的UDF

2021-06-01 文章
ublic void initializeState(FunctionInitializationContext context) throws Exception { mapStateDescriptor = new MapStateDescriptor<>( "app-status-map", String.class, Integer.class); mapState = context.getKeyedStateStore().getMapState(mapStateDescriptor); LOGGER.info("the initializeState

回复: yarn.containers.vcores使用问题

2021-03-04 文章
已经可以了 flink1.11 在jobManger的ui页面配置信息那块能看到这个参数是否配置成功 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2021年03月5日 11:44,Xintong Song 写道: 你的 flink 是什么版本? 部署模式是 per-job 还是 session? “看到任务配置参数也生效了”具体是在哪里看到的? Thank you~ Xintong Song On Thu, Mar 4, 2021 at 4:35 PM 阿华田 wrote: 使用-yD

yarn.containers.vcores使用问题

2021-03-04 文章
使用-yD yarn.containers.vcores=4 区设置flink任务的总的cpu核数,看到任务配置参数也生效了 但是实际申请核数还是按照 cpu slot一比一申请的 各位大佬使用yarn.containers.vcores是不是还需要开启yarn的cpu 调度 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

回复: flinksql集成hive权限管理

2021-02-26 文章
Ok 感谢 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2021年02月26日 15:29,Rui Li 写道: 你好, 目前hive connector还没有支持ranger,只支持HMS端基于storage的权限控制。 On Thu, Feb 25, 2021 at 8:49 PM 阿华田 wrote: 目前在做基于flinksql搭建实时计算平台,使用hive的catalog管理flinksql的元数据。hive本身使用Ranger进行权限管理。但是flinksql使用hive的catalog管理元数据,用户的权

flinksql集成hive权限管理

2021-02-25 文章
目前在做基于flinksql搭建实时计算平台,使用hive的catalog管理flinksql的元数据。hive本身使用Ranger进行权限管理。但是flinksql使用hive的catalog管理元数据,用户的权限Ranger好像无法管理。各位大佬知道有什么比较好的方式解决吗? | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

flink clickhouse connector

2021-02-02 文章
. | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

FOR SYSTEM_TIME AS OF 维表关联 报错

2021-01-29 文章
-> STREAM_PHYSICAL] There is 1 empty subset: rel#280:Subset#14.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], the relevant part of the original plan is as follows | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

flink-sql-gateway如何使用flink自定义的udf

2021-01-26 文章
各位大佬,Flink-sql-gateway 提交flink sql任务 ,如何使用flink自定义的udf | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

回复:flink1.11 DDL定义kafka source报错

2020-08-07 文章
"'update-mode' = 'append')"; tableEnv.executeSql(create_sql); Table table = tableEnv.sqlQuery("select name from test "); TableSchema schema = table.getSchema(); System.out.println(schema); DataStream> tuple2DataStream = tableEnv.toRetractStream(table, Row.class); tuple2DataStream.print(); tableEnv.execute("test"); //bsEnv.execute("fff"); } } | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年08月7日 13:49,阿华田 写道: 代码如下 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

flink1.11 DDL定义kafka source报错

2020-08-06 文章
代码如下 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

flink读取kafka超时问题

2020-06-28 文章
broker造成了很大的负载导致请求超时? | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

回复: FileInputFormat 使用问题

2020-06-20 文章
解决了 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年06月18日 17:56,john<506269...@qq.com> 写道: 嗨,找到问题了吗?我也遇到了 2020年6月1日 下午2:48,阿华田 写道: //初始化任务参数 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); FileInputFormat fileInputFormat = new TextInputFormat(new Path

env.readFile 递归读取hdfs,临时文件不存在问题

2020-06-17 文章
使用flink读取递归读取hdfs文件,报.tmp结尾的文件不存在异常,正常这些tmp文件flink应该不用读取吧? File does not exist: /recommender/success_fid_flow/ds=2020-06-16/hour=14/2020-06-16_14.success_fid_jarvis-01.1592287200578.tmp | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制

回复:关于多个来源,如何保证数据对齐

2020-06-15 文章
建议使用缓存,因为b流会延迟20分钟到,所以将a流的数据缓存20分钟,时间到了在和b流进行关联,缓存推荐使用谷歌的cache, com.google.common.cache; | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年06月15日 14:41,steven chen 写道: hi: 1.项目中我们会汇集不同来源的消息的,然和合并进行统计并输出结果。 2. 有topic a 是所有的用户pv日志, topic b 是所有用户uv日志,现在1个job同时消费a,b2个消息,并将pv,uv的结果同时输出到下一级的

回复:flink on yarn报错 怎么获取

2020-06-02 文章
对 获取flink任务的metric信息 主要是任务状态 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年06月2日 14:21,air23 写道: 分钟级别定时去获取metrics? 这样吗 在 2020-06-02 14:05:39,"阿华田" 写道: 这种情况需要对flink任务进行监控 获取flink的任务状态 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年06月2日 14:03,air2

回复:flink on yarn报错 怎么获取

2020-06-02 文章
这种情况需要对flink任务进行监控 获取flink的任务状态 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年06月2日 14:03,air23 写道: 今天发现taskmanagers报json解析失败 他一起在重启 但是我们这边是监控yarn 任务级别的。像这种task 里面报错。yarn任务又不会挂掉。应该怎么去做监控。才能得知 程序后台有问题 谢谢

FileInputFormat 使用问题

2020-06-01 文章
使用FileInputFormat 递归读取hdfs文件,并添加过滤器。程序执行没有报错但是很快就执行完成也没有读取到数据,本地测试可以过滤并读取到数据,yarn集群上执行出现上述情况。 代码: //初始化任务参数 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs://arc/success_fid_flow"));

回复:recursive.file.enumeration使用问题

2020-05-28 文章
说明一下 读取的数据还没有到今天的数据 也就是提示文件不存在的目录xxx/ds=2020-05-28 | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年05月28日 16:36,阿华田 写道: 使用recursive.file.enumeration开启递归读取hdfs的目录文件,但是每次数据没有读完就会报如下错误: java.io.IOException: Error opening the InputSplit hdfs://xxx/ds=2020-05-28/hour=15/2020-05-28_15

recursive.file.enumeration使用问题

2020-05-28 文章
使用recursive.file.enumeration开启递归读取hdfs的目录文件,但是每次数据没有读完就会报如下错误: java.io.IOException: Error opening the InputSplit hdfs://xxx/ds=2020-05-28/hour=15/2020-05-28_15.log.flume2_idcfeature_kafkamq.tmp [0,134217728]: File does not exist: /xxx/ds=2020-05-28/hour=15/ | | 王志华 | | a15733178...@163.com |

flink正则读取hdfs目录下的文件

2020-05-21 文章
input_data = "hdfs://localhost:9002/tmp/match_bak/%s*[0-9]" % ('2018-07-16’) result = sc.textFile(input_data) flink可以像spark一样正则读取hdfs目录下的文件吗?目前测试好像不行,如果不支持,最早什么版本会支持呢? | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制

回复: flink如何正则读取hdfs下的文件

2020-05-21 文章
onnectors/streamfile_sink.html Best, Jingsong Lee On Thu, May 21, 2020 at 10:59 AM jimandlice wrote: flink 写入hive 使用api 思路是怎么的呢 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月21日 10:57,阿华田 写道: flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件

flink如何正则读取hdfs下的文件

2020-05-20 文章
flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件 | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制

回复: flink背压问题

2020-04-28 文章
好的 感谢 | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年04月29日 10:29,Junzhong Qin 写道: 可以试一下Jsoniter, https://jsoniter.com/index.cn.html 阿华田 于2020年4月29日周三 上午10:07写道: 这个确实排查到了,主要是json解析那块耗时,老版本用的gson,现在改成fastjson了,解析速度提升了不少。看来大数据量的json解析还得是fastjson | | 王志华 | | a15733178...@163.com | 签名由

回复: flink背压问题

2020-04-28 文章
这个确实排查到了,主要是json解析那块耗时,老版本用的gson,现在改成fastjson了,解析速度提升了不少。看来大数据量的json解析还得是fastjson | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年04月29日 10:02,LakeShen 写道: Hi 阿华, 数据延迟有可能是逻辑中某个环节比较耗时,比如查询 mysql,或者某处逻辑较复杂等等。 可以看看自己代码中,有么有比较耗时的逻辑。同时可以将自己认为比较耗时的地方,加上日志,看下处理时间。 Best, LakeShen 阿华田 于2020年

回复: flink背压问题

2020-04-28 文章
、对于数据延迟建议还是得对 Kafka 消费的 Topic 进行消费组的监控,加上 Lag 告警,这样可以及时知道数据延迟情况 Best ! zhisheng 阿华田 于2020年4月28日周二 上午9:37写道: 线上任务对背压进行了监控,背压一直正常,任务却出现了大量的数据延迟,数据延迟不会产生背压吗? | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制

flink背压问题

2020-04-27 文章
线上任务对背压进行了监控,背压一直正常,任务却出现了大量的数据延迟,数据延迟不会产生背压吗? | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制

WechatIMG169

2020-04-16 文章
在代码中给算子添加了名称,但是flink的ui上还是显示原始的名称,这种情况大佬们遇见过吗? | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制

回复:自定义具有Exactly-Once语义的sink

2020-04-14 文章
 谢谢 | | 王志华 | | 邮箱:a15733178...@163.com | 签名由 网易邮箱大师 定制 在2020年04月15日 11:44,zhang...@lakala.com 写道: 昨天晚上看到一篇微信公众号文章,分享给你,希望对你有帮助。 “Flink 端到端 Exactly-once 机制剖析” https://mp.weixin.qq.com/s/fhUNuCOVFQUjRB-fo4Rl2g 发件人: 阿华田 发送时间: 2020-04-15 11:00 收件人: user-zh@flink.apache.org 主题: 自定义具有Exactly

自定义具有Exactly-Once语义的sink

2020-04-14 文章
如果自定义具有Exactly-Once语义的sink,继承TwoPhaseCommitSinkFunction,怎么确保真正的Exactly-Once,大佬们有已经上线使用的案例demo吗? | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制